Repository: giraph
Updated Branches:
  refs/heads/trunk f9dc6b59e -> d7e4bde11


http://git-wip-us.apache.org/repos/asf/giraph/blob/d7e4bde1/giraph-block-app-8/src/test/java/org/apache/giraph/block_app/library/prepare_graph/TestConnectedComponents.java
----------------------------------------------------------------------
diff --git 
a/giraph-block-app-8/src/test/java/org/apache/giraph/block_app/library/prepare_graph/TestConnectedComponents.java
 
b/giraph-block-app-8/src/test/java/org/apache/giraph/block_app/library/prepare_graph/TestConnectedComponents.java
new file mode 100644
index 0000000..c5c91ce
--- /dev/null
+++ 
b/giraph-block-app-8/src/test/java/org/apache/giraph/block_app/library/prepare_graph/TestConnectedComponents.java
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.block_app.library.prepare_graph;
+
+import org.apache.giraph.block_app.framework.api.local.LocalBlockRunner;
+import org.apache.giraph.block_app.framework.block.Block;
+import org.apache.giraph.block_app.library.VertexSuppliers;
+import 
org.apache.giraph.block_app.library.prepare_graph.vertex.ConnectedComponentVertexValue;
+import 
org.apache.giraph.block_app.library.prepare_graph.vertex.WeaklyConnectedComponentVertexValue;
+import 
org.apache.giraph.block_app.library.prepare_graph.vertex.WeaklyConnectedComponentVertexValueImpl;
+import org.apache.giraph.block_app.test_setup.NumericTestGraph;
+import org.apache.giraph.block_app.test_setup.graphs.Small1GraphInit;
+import org.apache.giraph.block_app.test_setup.graphs.Small2GraphInit;
+import 
org.apache.giraph.block_app.test_setup.graphs.SmallDirectedForestGraphInit;
+import 
org.apache.giraph.block_app.test_setup.graphs.SmallDirectedTreeGraphInit;
+import org.apache.giraph.conf.GiraphConfiguration;
+import org.apache.giraph.conf.GiraphConstants;
+import org.apache.giraph.function.vertex.ConsumerWithVertex;
+import org.apache.giraph.function.vertex.SupplierFromVertex;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Writable;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestConnectedComponents {
+
+  private static final
+  SupplierFromVertex<LongWritable, LongWritable, Writable, LongWritable> 
GET_COMPONENT =
+    VertexSuppliers.<LongWritable, LongWritable, 
Writable>vertexValueSupplier();
+
+  private static final
+  ConsumerWithVertex<LongWritable, LongWritable, Writable, LongWritable> 
SET_COMPONENT =
+    (vertex, value) -> vertex.getValue().set(value.get());
+
+
+  private <V extends Writable>
+  NumericTestGraph<LongWritable, V, NullWritable> createEmptyGraph(Class<V> 
vertexValue) {
+    GiraphConfiguration conf = new GiraphConfiguration();
+
+    GiraphConstants.VERTEX_ID_CLASS.set(conf, LongWritable.class);
+    GiraphConstants.VERTEX_VALUE_CLASS.set(conf, vertexValue);
+    GiraphConstants.EDGE_VALUE_CLASS.set(conf, NullWritable.class);
+
+    NumericTestGraph<LongWritable, V, NullWritable> graph = new 
NumericTestGraph<>(conf);
+    return graph;
+  }
+
+  private NumericTestGraph<LongWritable, LongWritable, NullWritable> 
createEmptyGraph() {
+    return createEmptyGraph(LongWritable.class);
+  }
+
+  @Test
+  public void testCC() throws Exception {
+    NumericTestGraph<LongWritable, LongWritable, NullWritable> graph = 
createEmptyGraph();
+    new Small1GraphInit<LongWritable, LongWritable, 
NullWritable>().modifyGraph(graph);
+
+    Block ccBlock = UndirectedConnectedComponents.calculateConnectedComponents(
+        100, GET_COMPONENT, SET_COMPONENT);
+    LocalBlockRunner.runBlock(graph.getTestGraph(), ccBlock, new Object());
+
+    for (int i : new int[] {0, 1, 2, 3, 4, 5}) {
+      Assert.assertEquals(0, graph.getValue(i).get());
+    }
+    for (int i : new int[] {6}) {
+      Assert.assertEquals(6, graph.getValue(i).get());
+    }
+
+    Block keepLargestBlock = 
UndirectedConnectedComponents.calculateAndKeepLargestComponent(
+        100, GET_COMPONENT, SET_COMPONENT);
+
+    LocalBlockRunner.runBlock(graph.getTestGraph(), keepLargestBlock, new 
Object());
+
+    for (int i : new int[] {0, 1, 2, 3, 4, 5}) {
+      Assert.assertEquals(0, graph.getValue(i).get());
+    }
+
+    for (int i : new int[] {6}) {
+      Assert.assertNull(graph.getVertex(i));
+    }
+  }
+
+  @Test
+  public void testMultipleComponentCC() throws Exception {
+    NumericTestGraph<LongWritable, LongWritable, NullWritable> graph = 
createEmptyGraph();
+    new Small2GraphInit<LongWritable, LongWritable, 
NullWritable>().modifyGraph(graph);
+
+    Block ccBlock = UndirectedConnectedComponents.calculateConnectedComponents(
+        100, GET_COMPONENT, SET_COMPONENT);
+    LocalBlockRunner.runBlock(graph.getTestGraph(), ccBlock, new Object());
+
+    for (int i : new int[] {0, 1, 2}) {
+      Assert.assertEquals(0, graph.getValue(i).get());
+    }
+    for (int i : new int[] {3, 4, 5}) {
+      Assert.assertEquals(3, graph.getValue(i).get());
+    }
+    for (int i : new int[] {6}) {
+      Assert.assertEquals(6, graph.getValue(i).get());
+    }
+
+    Block keepAbove3 = 
UndirectedConnectedComponents.calculateAndKeepComponentAboveThreshold(
+        100, 3, GET_COMPONENT, SET_COMPONENT);
+
+    LocalBlockRunner.runBlock(graph.getTestGraph(), keepAbove3, new Object());
+
+    for (int i : new int[] {0, 1, 2}) {
+      Assert.assertEquals(0, graph.getValue(i).get());
+    }
+    for (int i : new int[] {3, 4, 5}) {
+      Assert.assertEquals(3, graph.getValue(i).get());
+    }
+
+    for (int i : new int[] {6}) {
+      Assert.assertNull(graph.getVertex(i));
+    }
+
+    Block keepAbove4 = 
UndirectedConnectedComponents.calculateAndKeepComponentAboveThreshold(
+        100, 4, GET_COMPONENT, SET_COMPONENT);
+
+    LocalBlockRunner.runBlock(graph.getTestGraph(), keepAbove4, new Object());
+
+    for (int i : new int[] {0, 1, 2, 3, 4, 5}) {
+      Assert.assertNull(graph.getVertex(i));
+    }
+  }
+
+  @Test
+  public void testWeaklyCCOnTree() {
+    NumericTestGraph<LongWritable, WeaklyConnectedComponentVertexValueImpl, 
NullWritable> graph =
+        createEmptyGraph(WeaklyConnectedComponentVertexValueImpl.class);
+    new SmallDirectedTreeGraphInit<LongWritable, 
WeaklyConnectedComponentVertexValueImpl, NullWritable>().modifyGraph(graph);
+
+    Block weaklyCC = WeaklyConnectedComponents.calculateConnectedComponents(
+        200,
+        ConnectedComponentVertexValue.getComponentSupplier(),
+        ConnectedComponentVertexValue.setComponentConsumer(),
+        WeaklyConnectedComponentVertexValue.getEdgeIdsSupplier(),
+        WeaklyConnectedComponentVertexValue.setEdgeIdsConsumer(),
+        false);
+
+    LocalBlockRunner.runBlock(graph.getTestGraph(), weaklyCC, new Object());
+
+    for (int i : new int[] {0, 1, 2, 3, 4, 5, 6}) {
+      Assert.assertEquals(0, graph.getValue(i).getComponent());
+    }
+  }
+
+  @Test
+  public void testWeaklyCCOnForest() {
+    NumericTestGraph<LongWritable, WeaklyConnectedComponentVertexValueImpl, 
NullWritable> graph =
+        createEmptyGraph(WeaklyConnectedComponentVertexValueImpl.class);
+    new SmallDirectedForestGraphInit<LongWritable, 
WeaklyConnectedComponentVertexValueImpl, NullWritable>().modifyGraph(graph);
+
+    Block weaklyCC = WeaklyConnectedComponents.calculateConnectedComponents(
+        200,
+        ConnectedComponentVertexValue.getComponentSupplier(),
+        ConnectedComponentVertexValue.setComponentConsumer(),
+        WeaklyConnectedComponentVertexValue.getEdgeIdsSupplier(),
+        WeaklyConnectedComponentVertexValue.setEdgeIdsConsumer(),
+        false);
+
+    LocalBlockRunner.runBlock(graph.getTestGraph(), weaklyCC, new Object());
+
+    for (int i : new int[] {0, 1, 2, 3}) {
+      Assert.assertEquals(0, graph.getValue(i).getComponent());
+    }
+
+    for (int i : new int[] {4, 5}) {
+      Assert.assertEquals(4, graph.getValue(i).getComponent());
+    }
+
+    for (int i : new int[] {6, 7, 8}) {
+      Assert.assertEquals(6, graph.getValue(i).getComponent());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/d7e4bde1/giraph-block-app-8/src/test/java/org/apache/giraph/block_app/library/prepare_graph/TestPrepareGraph.java
----------------------------------------------------------------------
diff --git 
a/giraph-block-app-8/src/test/java/org/apache/giraph/block_app/library/prepare_graph/TestPrepareGraph.java
 
b/giraph-block-app-8/src/test/java/org/apache/giraph/block_app/library/prepare_graph/TestPrepareGraph.java
new file mode 100644
index 0000000..bd4957b
--- /dev/null
+++ 
b/giraph-block-app-8/src/test/java/org/apache/giraph/block_app/library/prepare_graph/TestPrepareGraph.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.block_app.library.prepare_graph;
+
+import org.apache.giraph.block_app.framework.AbstractBlockFactory;
+import org.apache.giraph.block_app.framework.BlockUtils;
+import org.apache.giraph.block_app.framework.block.Block;
+import org.apache.giraph.block_app.framework.block.SequenceBlock;
+import 
org.apache.giraph.block_app.library.prepare_graph.TestPrepareGraph.TmpBlockFactory.TmpRunBlockFactory;
+import 
org.apache.giraph.block_app.library.prepare_graph.TestPrepareGraph.TmpBlockFactory.TmpRunBlockFactory2;
+import org.apache.giraph.block_app.test_setup.NumericTestGraph;
+import org.apache.giraph.block_app.test_setup.TestGraphModifier;
+import org.apache.giraph.block_app.test_setup.TestGraphUtils;
+import org.apache.giraph.conf.GiraphConfiguration;
+import org.apache.giraph.edge.Edge;
+import org.apache.giraph.types.ops.LongTypeOps;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.junit.Assert;
+import org.junit.experimental.theories.DataPoints;
+import org.junit.experimental.theories.Theories;
+import org.junit.experimental.theories.Theory;
+import org.junit.runner.RunWith;
+
+@RunWith(Theories.class)
+public class TestPrepareGraph {
+  public static @DataPoints boolean[] booleanOptions = {false, true};
+
+  @Theory
+  public void test1(boolean fullGiraphEnv) throws Exception {
+    TestGraphUtils.runTest(
+        new CreateTestGraph(),
+        (graph) -> {
+          Assert.assertTrue(connected(graph, 0, 1));
+          Assert.assertTrue(connected(graph, 1, 0));
+          Assert.assertTrue(!connected(graph, 1, 2));
+          Assert.assertNull(graph.getVertex(2));
+          Assert.assertNull(graph.getVertex(3));
+        },
+        (conf) -> {
+          BlockUtils.setBlockFactoryClass(conf, TmpRunBlockFactory.class);
+          TestGraphUtils.USE_FULL_GIRAPH_ENV_IN_TESTS.set(conf, fullGiraphEnv);
+        });
+  }
+
+  @Theory
+  public void test2(boolean fullGiraphEnv) throws Exception {
+    TestGraphUtils.runTest(
+        new CreateTestGraph(),
+        (graph) -> {
+          Assert.assertTrue(connected(graph, 0, 1));
+          Assert.assertTrue(connected(graph, 1, 0));
+          Assert.assertTrue(connected(graph, 1, 2));
+          Assert.assertTrue(connected(graph, 2, 1));
+          Assert.assertNull(graph.getVertex(3));
+        },
+        (conf) -> {
+          BlockUtils.setBlockFactoryClass(conf, TmpRunBlockFactory2.class);
+          TestGraphUtils.USE_FULL_GIRAPH_ENV_IN_TESTS.set(conf, fullGiraphEnv);
+        });
+  }
+
+  private static boolean connected(
+      NumericTestGraph<LongWritable, NullWritable, NullWritable> graph, int 
from,
+      int to) {
+    for (Edge<LongWritable, NullWritable> edge : 
graph.getVertex(from).getEdges()) {
+      if (edge.getTargetVertexId().get() == to) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  private static class CreateTestGraph
+      implements TestGraphModifier<LongWritable, NullWritable, NullWritable> {
+    @Override
+    public void modifyGraph(NumericTestGraph<LongWritable, NullWritable, 
NullWritable> graph) {
+      graph.addEdge(0, 1);
+      graph.addEdge(1, 0);
+      graph.addEdge(1, 2);
+    }
+  }
+
+
+  /**
+   * class used for testing for building the block to test
+   */
+  public static abstract class TmpBlockFactory extends 
AbstractBlockFactory<Object> {
+    @Override
+    public Object createExecutionStage(GiraphConfiguration conf) {
+      return new Object();
+    }
+
+    @Override
+    protected Class<LongWritable> getVertexIDClass(GiraphConfiguration conf) {
+      return LongWritable.class;
+    }
+
+    @Override
+    protected Class<NullWritable> getVertexValueClass(GiraphConfiguration 
conf) {
+      return NullWritable.class;
+    }
+
+    @Override
+    protected Class<NullWritable> getEdgeValueClass(GiraphConfiguration conf) {
+      return NullWritable.class;
+    }
+
+    /**
+     * Temporary factory that creates a sequence of pieces to remove 
asymmetric edges
+     * and standing alone vertices.(Used for testing)
+     */
+    public static class TmpRunBlockFactory extends TmpBlockFactory {
+      @Override
+      public Block createBlock(GiraphConfiguration conf) {
+        return new 
SequenceBlock(PrepareGraphPieces.removeAsymEdges(LongTypeOps.INSTANCE),
+            PrepareGraphPieces.removeStandAloneVertices());
+      }
+    }
+
+    /**
+     * Temporary factory that creates a sequence of pieces to make the graph 
symmetric
+     * and remove standing alone vertices.(Used for testing)
+     */
+    public static class TmpRunBlockFactory2 extends TmpBlockFactory {
+      // relies on shouldCreateVertexOnMsgs=true
+
+      @Override
+      public Block createBlock(GiraphConfiguration conf) {
+        return new 
SequenceBlock(PrepareGraphPieces.makeSymmetricUnweighted(LongTypeOps.INSTANCE),
+            PrepareGraphPieces.removeStandAloneVertices());
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/d7e4bde1/giraph-block-app/pom.xml
----------------------------------------------------------------------
diff --git a/giraph-block-app/pom.xml b/giraph-block-app/pom.xml
index a05c1c5..3524e19 100644
--- a/giraph-block-app/pom.xml
+++ b/giraph-block-app/pom.xml
@@ -110,6 +110,10 @@ under the License.
       <groupId>log4j</groupId>
       <artifactId>log4j</artifactId>
     </dependency>
+    <dependency>
+      <groupId>com.google.code.findbugs</groupId>
+      <artifactId>annotations</artifactId>
+    </dependency>
 
     <!-- runtime dependency -->
     <dependency>

http://git-wip-us.apache.org/repos/asf/giraph/blob/d7e4bde1/giraph-block-app/src/main/java/org/apache/giraph/block_app/reducers/array/HugeArrayUtils.java
----------------------------------------------------------------------
diff --git 
a/giraph-block-app/src/main/java/org/apache/giraph/block_app/reducers/array/HugeArrayUtils.java
 
b/giraph-block-app/src/main/java/org/apache/giraph/block_app/reducers/array/HugeArrayUtils.java
index be5d4fe..815cbfa 100644
--- 
a/giraph-block-app/src/main/java/org/apache/giraph/block_app/reducers/array/HugeArrayUtils.java
+++ 
b/giraph-block-app/src/main/java/org/apache/giraph/block_app/reducers/array/HugeArrayUtils.java
@@ -48,24 +48,26 @@ import org.apache.hadoop.io.Writable;
  * In Giraph, for each reducer there is a worker machine which is it's owner,
  * which does partial aggregation for it. So if we have only single huge
  * reducer - other workers will have to wait, while that single worker is doing
- * huge reducing operation.
+ * huge reducing operation. Additionally single reducer should be smaller then
+ * max netty message, which is 1MB.
  * On the other hand, each reducer has a meaningful overhead, so we should try
- * to keep number of reducers as low as possible (in total less then 10k is a
- * good number).
- * What we want is to split such huge reducers into slightly more then number
- * of worker reducers, and NUM_REDUCERS = 50000 is used here as a good middle
- * ground.
+ * to keep number of reducers as low as possible.
+ *
+ * By default we are being conservative, to keep individual reducers small,
+ * with striping into 500k reducers by default. If you know exact sizes of
+ * your objects you can specify exact number you want.
  *
  * So when we have huge array, we don't want one reducer/broadcast for each
  * element, but we also don't want one reducer/broadcast for the whole array.
  *
  * This class allows transparent split into reasonable number of reducers
- * (~50000), which solves both of the above issues.
+ * (~500k), which solves both of the above issues.
  */
 public class HugeArrayUtils {
-  // Striping perfectly reducers of up to 25GB (i.e. 500KB * NUM_STRIPES).
+  // Even with 100GB object, average stripe will be 200KB on average,
+  // keeping outliers mostly under 1MB limit
   private static final IntConfOption NUM_STRIPES = new IntConfOption(
-      "giraph.reducers.HugeArrayUtils.num_stripes", 50000,
+      "giraph.reducers.HugeArrayUtils.num_stripes", 500000,
       "Number of distict reducers to create. If array is smaller then this" +
       "number, each element will be it's own reducer");
 

http://git-wip-us.apache.org/repos/asf/giraph/blob/d7e4bde1/giraph-block-app/src/main/java/org/apache/giraph/block_app/test_setup/NumericTestGraph.java
----------------------------------------------------------------------
diff --git 
a/giraph-block-app/src/main/java/org/apache/giraph/block_app/test_setup/NumericTestGraph.java
 
b/giraph-block-app/src/main/java/org/apache/giraph/block_app/test_setup/NumericTestGraph.java
index 4886c80..c5d2fb1 100644
--- 
a/giraph-block-app/src/main/java/org/apache/giraph/block_app/test_setup/NumericTestGraph.java
+++ 
b/giraph-block-app/src/main/java/org/apache/giraph/block_app/test_setup/NumericTestGraph.java
@@ -180,7 +180,9 @@ public class NumericTestGraph<I extends WritableComparable,
    */
   public void addEdge(Number fromVertex, Number toVertex, E edgeValue) {
     testGraph.addEdge(
-        numberToVertexId(fromVertex), numberToVertexId(toVertex), edgeValue);
+        numberToVertexId(fromVertex),
+        numberToVertexId(toVertex),
+        edgeValueOrCreate(edgeValue));
   }
 
   /**
@@ -310,6 +312,11 @@ public class NumericTestGraph<I extends WritableComparable,
       numberToEdgeValue.apply(edgeValue) : getConf().createEdgeValue();
   }
 
+  public E edgeValueOrCreate(E edgeValue) {
+    return edgeValue != null ? edgeValue : getConf().createEdgeValue();
+  }
+
+
   public Vertex<I, V, E> createVertex() {
     return getConf().createVertex();
   }

http://git-wip-us.apache.org/repos/asf/giraph/blob/d7e4bde1/giraph-block-app/src/main/java/org/apache/giraph/block_app/test_setup/graphs/Small1GraphInit.java
----------------------------------------------------------------------
diff --git 
a/giraph-block-app/src/main/java/org/apache/giraph/block_app/test_setup/graphs/Small1GraphInit.java
 
b/giraph-block-app/src/main/java/org/apache/giraph/block_app/test_setup/graphs/Small1GraphInit.java
index ecec024..b014791 100644
--- 
a/giraph-block-app/src/main/java/org/apache/giraph/block_app/test_setup/graphs/Small1GraphInit.java
+++ 
b/giraph-block-app/src/main/java/org/apache/giraph/block_app/test_setup/graphs/Small1GraphInit.java
@@ -40,23 +40,30 @@ public class Small1GraphInit<I extends WritableComparable,
     V extends Writable, E extends Writable>
     implements TestGraphModifier<I, V, E> {
 
-  private final Supplier<V> valueSupplier;
   private final Supplier<E> edgeSupplier;
 
-  public Small1GraphInit(
-      Supplier<V> valueSupplier, Supplier<E> edgeSupplier) {
-    this.valueSupplier = valueSupplier;
+  public Small1GraphInit() {
+    this(null);
+  }
+
+  public Small1GraphInit(Supplier<E> edgeSupplier) {
     this.edgeSupplier = edgeSupplier;
   }
 
   @Override
   public void modifyGraph(NumericTestGraph<I, V, E> graph) {
-    graph.addVertex(0, valueSupplier.get(), edgeSupplier, 1, 2);
-    graph.addVertex(1, valueSupplier.get(), edgeSupplier, 0, 2);
-    graph.addVertex(2, valueSupplier.get(), edgeSupplier, 0, 1, 3);
-    graph.addVertex(3, valueSupplier.get(), edgeSupplier, 2, 4, 5);
-    graph.addVertex(4, valueSupplier.get(), edgeSupplier, 3, 5);
-    graph.addVertex(5, valueSupplier.get(), edgeSupplier, 3, 4);
-    graph.addVertex(6, valueSupplier.get(), edgeSupplier);
+    graph.addSymmetricEdge(0, 1, createEdgeValue());
+    graph.addSymmetricEdge(0, 2, createEdgeValue());
+    graph.addSymmetricEdge(1, 2, createEdgeValue());
+    graph.addSymmetricEdge(2, 3, createEdgeValue());
+    graph.addSymmetricEdge(3, 4, createEdgeValue());
+    graph.addSymmetricEdge(3, 5, createEdgeValue());
+    graph.addSymmetricEdge(4, 5, createEdgeValue());
+
+    graph.addVertex(6);
+  }
+
+  private E createEdgeValue() {
+    return edgeSupplier != null ? edgeSupplier.get() : null;
   }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/d7e4bde1/giraph-block-app/src/main/java/org/apache/giraph/block_app/test_setup/graphs/Small2GraphInit.java
----------------------------------------------------------------------
diff --git 
a/giraph-block-app/src/main/java/org/apache/giraph/block_app/test_setup/graphs/Small2GraphInit.java
 
b/giraph-block-app/src/main/java/org/apache/giraph/block_app/test_setup/graphs/Small2GraphInit.java
index eb38c45..bfa603c 100644
--- 
a/giraph-block-app/src/main/java/org/apache/giraph/block_app/test_setup/graphs/Small2GraphInit.java
+++ 
b/giraph-block-app/src/main/java/org/apache/giraph/block_app/test_setup/graphs/Small2GraphInit.java
@@ -39,25 +39,30 @@ import org.apache.hadoop.io.WritableComparable;
 public class Small2GraphInit<I extends WritableComparable,
     V extends Writable, E extends Writable>
     implements TestGraphModifier<I, V, E> {
-
-  private final Supplier<V> valueSupplier;
   private final Supplier<E> edgeSupplier;
 
-  public Small2GraphInit(
-      Supplier<V> valueSupplier, Supplier<E> edgeSupplier) {
-    this.valueSupplier = valueSupplier;
+  public Small2GraphInit() {
+    this(null);
+  }
+
+  public Small2GraphInit(Supplier<E> edgeSupplier) {
     this.edgeSupplier = edgeSupplier;
   }
 
   @Override
   public void modifyGraph(NumericTestGraph<I, V, E> graph) {
-    graph.addVertex(0, valueSupplier.get(), edgeSupplier, 1, 2);
-    graph.addVertex(1, valueSupplier.get(), edgeSupplier, 0, 2);
-    graph.addVertex(2, valueSupplier.get(), edgeSupplier, 0, 1);
-    graph.addVertex(3, valueSupplier.get(), edgeSupplier, 4, 5);
-    graph.addVertex(4, valueSupplier.get(), edgeSupplier, 3, 5);
-    graph.addVertex(5, valueSupplier.get(), edgeSupplier, 3, 4);
-    graph.addVertex(6, valueSupplier.get(), edgeSupplier);
+    graph.addSymmetricEdge(0, 1, createEdgeValue());
+    graph.addSymmetricEdge(0, 2, createEdgeValue());
+    graph.addSymmetricEdge(1, 2, createEdgeValue());
+    graph.addSymmetricEdge(3, 4, createEdgeValue());
+    graph.addSymmetricEdge(3, 5, createEdgeValue());
+    graph.addSymmetricEdge(4, 5, createEdgeValue());
+
+    graph.addVertex(6);
+  }
+
+  private E createEdgeValue() {
+    return edgeSupplier != null ? edgeSupplier.get() : null;
   }
 }
 

http://git-wip-us.apache.org/repos/asf/giraph/blob/d7e4bde1/giraph-block-app/src/main/java/org/apache/giraph/block_app/test_setup/graphs/SmallDirectedForestGraphInit.java
----------------------------------------------------------------------
diff --git 
a/giraph-block-app/src/main/java/org/apache/giraph/block_app/test_setup/graphs/SmallDirectedForestGraphInit.java
 
b/giraph-block-app/src/main/java/org/apache/giraph/block_app/test_setup/graphs/SmallDirectedForestGraphInit.java
new file mode 100644
index 0000000..a2194b9
--- /dev/null
+++ 
b/giraph-block-app/src/main/java/org/apache/giraph/block_app/test_setup/graphs/SmallDirectedForestGraphInit.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.block_app.test_setup.graphs;
+
+import org.apache.giraph.block_app.test_setup.NumericTestGraph;
+import org.apache.giraph.block_app.test_setup.TestGraphModifier;
+import org.apache.giraph.function.Supplier;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+
+/**
+ * Create a directed forest that looks like:
+ *
+ *   0      4     6
+ *  / \     |    / \
+ * 1   2    5   7   8
+ *     |
+ *     3
+ *
+ * Edges are directed from top to bottom.
+ * Vertices with no edges are created.
+ *
+ * @param <I> Vertex id type
+ * @param <V> Vertex value type
+ * @param <E> Edge value type
+ */
+public class SmallDirectedForestGraphInit<I extends WritableComparable,
+    V extends Writable, E extends Writable>
+    implements TestGraphModifier<I, V, E> {
+
+  private final Supplier<E> edgeSupplier;
+
+  public SmallDirectedForestGraphInit() {
+    this(null);
+  }
+
+  public SmallDirectedForestGraphInit(Supplier<E> edgeSupplier) {
+    this.edgeSupplier = edgeSupplier;
+  }
+
+  @Override
+  public void modifyGraph(NumericTestGraph<I, V, E> graph) {
+    graph.addEdge(0, 1, createEdgeValue());
+    graph.addEdge(0, 2, createEdgeValue());
+    graph.addEdge(2, 3, createEdgeValue());
+    graph.addEdge(4, 5, createEdgeValue());
+    graph.addEdge(6, 7, createEdgeValue());
+    graph.addEdge(6, 8, createEdgeValue());
+
+    graph.addVertex(1);
+    graph.addVertex(3);
+    graph.addVertex(5);
+    graph.addVertex(7);
+    graph.addVertex(8);
+  }
+
+  private E createEdgeValue() {
+    return edgeSupplier != null ? edgeSupplier.get() : null;
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/d7e4bde1/giraph-block-app/src/main/java/org/apache/giraph/block_app/test_setup/graphs/SmallDirectedTreeGraphInit.java
----------------------------------------------------------------------
diff --git 
a/giraph-block-app/src/main/java/org/apache/giraph/block_app/test_setup/graphs/SmallDirectedTreeGraphInit.java
 
b/giraph-block-app/src/main/java/org/apache/giraph/block_app/test_setup/graphs/SmallDirectedTreeGraphInit.java
new file mode 100644
index 0000000..6522e62
--- /dev/null
+++ 
b/giraph-block-app/src/main/java/org/apache/giraph/block_app/test_setup/graphs/SmallDirectedTreeGraphInit.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.block_app.test_setup.graphs;
+
+import org.apache.giraph.block_app.test_setup.NumericTestGraph;
+import org.apache.giraph.block_app.test_setup.TestGraphModifier;
+import org.apache.giraph.function.Supplier;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+
+/**
+ * Create a directed tree that looks like:
+ *
+ *   0 __
+ *  / \  \
+ * 1   2  6
+ * |   |\
+ * 3   4 5
+ *
+ * Edges are directed from top to bottom.
+ * Vertices with no edges are created.
+ *
+ * @param <I> Vertex id type
+ * @param <V> Vertex value type
+ * @param <E> Edge value type
+ */
+public class SmallDirectedTreeGraphInit<I extends WritableComparable,
+    V extends Writable, E extends Writable>
+    implements TestGraphModifier<I, V, E> {
+
+  private final Supplier<E> edgeSupplier;
+
+  public SmallDirectedTreeGraphInit() {
+    this(null);
+  }
+
+  public SmallDirectedTreeGraphInit(Supplier<E> edgeSupplier) {
+    this.edgeSupplier = edgeSupplier;
+  }
+
+  @Override
+  public void modifyGraph(NumericTestGraph<I, V, E> graph) {
+    graph.addEdge(0, 1, createEdgeValue());
+    graph.addEdge(0, 2, createEdgeValue());
+    graph.addEdge(0, 6, createEdgeValue());
+    graph.addEdge(1, 3, createEdgeValue());
+    graph.addEdge(2, 4, createEdgeValue());
+    graph.addEdge(2, 5, createEdgeValue());
+
+    graph.addVertex(3);
+    graph.addVertex(4);
+    graph.addVertex(5);
+    graph.addVertex(6);
+  }
+
+  private E createEdgeValue() {
+    return edgeSupplier != null ? edgeSupplier.get() : null;
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/d7e4bde1/giraph-block-app/src/main/java/org/apache/giraph/object/MultiSizedReusable.java
----------------------------------------------------------------------
diff --git 
a/giraph-block-app/src/main/java/org/apache/giraph/object/MultiSizedReusable.java
 
b/giraph-block-app/src/main/java/org/apache/giraph/object/MultiSizedReusable.java
new file mode 100644
index 0000000..2a91e8f
--- /dev/null
+++ 
b/giraph-block-app/src/main/java/org/apache/giraph/object/MultiSizedReusable.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.object;
+
+import org.apache.giraph.function.Consumer;
+import org.apache.giraph.function.primitive.Int2ObjFunction;
+import org.apache.giraph.types.ops.PrimitiveIdTypeOps;
+import org.apache.giraph.types.ops.collections.Basic2ObjectMap;
+import org.apache.giraph.types.ops.collections.BasicSet;
+
+import com.google.common.base.Preconditions;
+
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+
+/**
+ * Holds reusable objects of multiple sizes.
+ * Example usecase, is when we need a hashmap - that we will insert and iterate
+ * on, both clear() and iterate method depend on size. And if we want to reuse
+ * objects, we want to have multiple objects of different sizes, that we will
+ * reuse.
+ *
+ * Instead of creating object for each distinct size, it creates objects with
+ * first larger power of 2.
+ *
+ * @param <T> Type of reusable object
+ */
+public class MultiSizedReusable<T> implements Int2ObjFunction<T> {
+  private final Int2ObjFunction<T> createSized;
+  private final Consumer<T> init;
+  @SuppressWarnings("unchecked")
+  @SuppressFBWarnings("SE_TRANSIENT_FIELD_NOT_RESTORED")
+  private final transient T[] holder = (T[]) new Object[Integer.SIZE];
+
+  // No-arg constructor Kryo can call to initialize holder
+  MultiSizedReusable() {
+    this(null, null);
+  }
+
+  public MultiSizedReusable(Int2ObjFunction<T> createSized, Consumer<T> init) {
+    this.createSized = createSized;
+    this.init = init;
+  }
+
+  @Override
+  public T apply(int size) {
+    Preconditions.checkArgument(size >= 0);
+    int shiftBits = (Integer.SIZE -
+        Integer.numberOfLeadingZeros(Math.max(0, size - 1))) / 2;
+    T result = holder[shiftBits];
+    if (result == null) {
+      if (shiftBits >= 15) {
+        result = createSized.apply(Integer.MAX_VALUE);
+      } else {
+        result = createSized.apply(1 << (shiftBits * 2 + 1));
+      }
+      holder[shiftBits] = result;
+    }
+    if (init != null) {
+      init.apply(result);
+    }
+    return result;
+  }
+
+  public static <I> MultiSizedReusable<BasicSet<I>> createForBasicSet(
+      final PrimitiveIdTypeOps<I> idTypeOps) {
+    return new MultiSizedReusable<>(
+        new Int2ObjFunction<BasicSet<I>>() {
+          @Override
+          public BasicSet<I> apply(int value) {
+            return idTypeOps.createOpenHashSet(value);
+          }
+        },
+        new Consumer<BasicSet<I>>() {
+          @Override
+          public void apply(BasicSet<I> t) {
+            t.clear();
+          }
+        });
+  }
+
+  public static <K, V>
+  MultiSizedReusable<Basic2ObjectMap<K, V>> createForBasic2ObjectMap(
+      final PrimitiveIdTypeOps<K> idTypeOps) {
+    return new MultiSizedReusable<>(
+        new Int2ObjFunction<Basic2ObjectMap<K, V>>() {
+          @Override
+          public Basic2ObjectMap<K, V> apply(int value) {
+            return idTypeOps.create2ObjectOpenHashMap(value, null);
+          }
+        },
+        new Consumer<Basic2ObjectMap<K, V>>() {
+          @Override
+          public void apply(Basic2ObjectMap<K, V> t) {
+            t.clear();
+          }
+        });
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/d7e4bde1/giraph-block-app/src/main/java/org/apache/giraph/object/package-info.java
----------------------------------------------------------------------
diff --git 
a/giraph-block-app/src/main/java/org/apache/giraph/object/package-info.java 
b/giraph-block-app/src/main/java/org/apache/giraph/object/package-info.java
new file mode 100644
index 0000000..bd578ef
--- /dev/null
+++ b/giraph-block-app/src/main/java/org/apache/giraph/object/package-info.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Object handling related utilities.
+ */
+package org.apache.giraph.object;

http://git-wip-us.apache.org/repos/asf/giraph/blob/d7e4bde1/giraph-block-app/src/test/java/org/apache/giraph/block_app/framework/BlockExecutionTest.java
----------------------------------------------------------------------
diff --git 
a/giraph-block-app/src/test/java/org/apache/giraph/block_app/framework/BlockExecutionTest.java
 
b/giraph-block-app/src/test/java/org/apache/giraph/block_app/framework/BlockExecutionTest.java
index 170f307..021a24c 100644
--- 
a/giraph-block-app/src/test/java/org/apache/giraph/block_app/framework/BlockExecutionTest.java
+++ 
b/giraph-block-app/src/test/java/org/apache/giraph/block_app/framework/BlockExecutionTest.java
@@ -58,10 +58,9 @@ public class BlockExecutionTest {
     return conf;
   }
 
-  private static TestGraph<LongWritable, LongWritable, NullWritable> 
createTestGraph(
-      GiraphConfiguration conf) {
+  private static TestGraph<LongWritable, LongWritable, NullWritable> 
createTestGraph() {
     TestGraph<LongWritable, LongWritable, NullWritable> graph =
-        new TestGraph<LongWritable, LongWritable, NullWritable>(conf);
+        new TestGraph<LongWritable, LongWritable, NullWritable>(createConf());
     graph.addVertex(new LongWritable(1), new LongWritable());
     graph.addVertex(new LongWritable(2), new LongWritable());
     graph.addVertex(new LongWritable(3), new LongWritable());
@@ -76,8 +75,7 @@ public class BlockExecutionTest {
 
   @Test
   public void testMessageSending() {
-    GiraphConfiguration conf = createConf();
-    TestGraph<LongWritable, LongWritable, NullWritable> graph = 
createTestGraph(conf);
+    TestGraph<LongWritable, LongWritable, NullWritable> graph = 
createTestGraph();
 
     LocalBlockRunner.runBlock(graph, new Piece<WritableComparable, 
LongWritable, Writable, BooleanWritable, Object>() {
       @Override
@@ -119,8 +117,7 @@ public class BlockExecutionTest {
 
   @Test
   public void testReducing() {
-    GiraphConfiguration conf = createConf();
-    TestGraph<LongWritable, LongWritable, NullWritable> graph = 
createTestGraph(conf);
+    TestGraph<LongWritable, LongWritable, NullWritable> graph = 
createTestGraph();
 
     final LongWritable value = new LongWritable();
 
@@ -153,4 +150,29 @@ public class BlockExecutionTest {
 
     Assert.assertEquals(4, value.get());
   }
+
+  public void testVertexRemoval() {
+    TestGraph<LongWritable, LongWritable, NullWritable> graph = 
createTestGraph();
+    LocalBlockRunner.runBlock(graph, new Piece<LongWritable, Writable, 
Writable, NoMessage, Object>() {
+      @Override
+      public VertexSender<LongWritable, Writable, Writable> getVertexSender(
+          final BlockWorkerSendApi<LongWritable, Writable, Writable, 
NoMessage> workerApi,
+          Object executionStage) {
+        return new InnerVertexSender() {
+          @Override
+          public void vertexSend(Vertex<LongWritable, Writable, Writable> 
vertex) {
+            long id = vertex.getId().get();
+            if (id == 1 || id == 3) {
+              workerApi.removeVertexRequest(vertex.getId());
+            }
+          }
+        };
+      }
+    }, new Object());
+
+    Assert.assertNull(graph.getVertex(new LongWritable(1)));
+    Assert.assertNotNull(graph.getVertex(new LongWritable(2)));
+    Assert.assertNull(graph.getVertex(new LongWritable(3)));
+    Assert.assertNotNull(graph.getVertex(new LongWritable(4)));
+  }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/d7e4bde1/giraph-core/src/main/java/org/apache/giraph/combiner/DoubleSumMessageCombiner.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/combiner/DoubleSumMessageCombiner.java
 
b/giraph-core/src/main/java/org/apache/giraph/combiner/DoubleSumMessageCombiner.java
index 04f6bdf..c8dc288 100644
--- 
a/giraph-core/src/main/java/org/apache/giraph/combiner/DoubleSumMessageCombiner.java
+++ 
b/giraph-core/src/main/java/org/apache/giraph/combiner/DoubleSumMessageCombiner.java
@@ -23,10 +23,12 @@ import org.apache.hadoop.io.WritableComparable;
 
 /**
  * A combiner that sums double-valued messages
+ *
+ * Use SumMessageCombiner.DOUBLE instead.
  */
+@Deprecated()
 public class DoubleSumMessageCombiner
-    extends
-    MessageCombiner<WritableComparable, DoubleWritable> {
+    implements MessageCombiner<WritableComparable, DoubleWritable> {
   @Override
   public void combine(WritableComparable vertexIndex,
       DoubleWritable originalMessage, DoubleWritable messageToCombine) {

http://git-wip-us.apache.org/repos/asf/giraph/blob/d7e4bde1/giraph-core/src/main/java/org/apache/giraph/combiner/FloatSumMessageCombiner.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/combiner/FloatSumMessageCombiner.java
 
b/giraph-core/src/main/java/org/apache/giraph/combiner/FloatSumMessageCombiner.java
index 3015e2b..4db0097 100644
--- 
a/giraph-core/src/main/java/org/apache/giraph/combiner/FloatSumMessageCombiner.java
+++ 
b/giraph-core/src/main/java/org/apache/giraph/combiner/FloatSumMessageCombiner.java
@@ -23,10 +23,12 @@ import org.apache.hadoop.io.WritableComparable;
 
 /**
  * A combiner that sums float-valued messages
+ *
+ * Use SumMessageCombiner.DOUBLE instead.
  */
+@Deprecated
 public class FloatSumMessageCombiner
-    extends
-    MessageCombiner<WritableComparable, FloatWritable> {
+    implements MessageCombiner<WritableComparable, FloatWritable> {
   @Override
   public void combine(WritableComparable vertexIndex,
       FloatWritable originalMessage, FloatWritable messageToCombine) {

http://git-wip-us.apache.org/repos/asf/giraph/blob/d7e4bde1/giraph-core/src/main/java/org/apache/giraph/combiner/MessageCombiner.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/combiner/MessageCombiner.java 
b/giraph-core/src/main/java/org/apache/giraph/combiner/MessageCombiner.java
index e53ab3f..1a19eee 100644
--- a/giraph-core/src/main/java/org/apache/giraph/combiner/MessageCombiner.java
+++ b/giraph-core/src/main/java/org/apache/giraph/combiner/MessageCombiner.java
@@ -29,7 +29,7 @@ import org.apache.hadoop.io.WritableComparable;
  * @param <I> Vertex id
  * @param <M> Message data
  */
-public abstract class MessageCombiner<I extends WritableComparable,
+public interface MessageCombiner<I extends WritableComparable,
     M extends Writable> {
   /**
    * Combine messageToCombine with originalMessage, by modifying
@@ -44,7 +44,7 @@ public abstract class MessageCombiner<I extends 
WritableComparable,
    *                         (object may be reused - do not reference it or its
    *                         member objects)
    */
-  public abstract void combine(I vertexIndex, M originalMessage,
+  void combine(I vertexIndex, M originalMessage,
       M messageToCombine);
 
   /**
@@ -53,5 +53,5 @@ public abstract class MessageCombiner<I extends 
WritableComparable,
    *
    * @return Initial message
    */
-  public abstract M createInitialMessage();
+  M createInitialMessage();
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/d7e4bde1/giraph-core/src/main/java/org/apache/giraph/combiner/MinimumDoubleMessageCombiner.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/combiner/MinimumDoubleMessageCombiner.java
 
b/giraph-core/src/main/java/org/apache/giraph/combiner/MinimumDoubleMessageCombiner.java
index db43008..9bebf81 100644
--- 
a/giraph-core/src/main/java/org/apache/giraph/combiner/MinimumDoubleMessageCombiner.java
+++ 
b/giraph-core/src/main/java/org/apache/giraph/combiner/MinimumDoubleMessageCombiner.java
@@ -25,8 +25,7 @@ import org.apache.hadoop.io.WritableComparable;
  * MessageCombiner which finds the minimum of {@link DoubleWritable}.
  */
 public class MinimumDoubleMessageCombiner
-    extends
-    MessageCombiner<WritableComparable, DoubleWritable> {
+    implements MessageCombiner<WritableComparable, DoubleWritable> {
   @Override
   public void combine(WritableComparable vertexIndex,
       DoubleWritable originalMessage, DoubleWritable messageToCombine) {

http://git-wip-us.apache.org/repos/asf/giraph/blob/d7e4bde1/giraph-core/src/main/java/org/apache/giraph/combiner/MinimumIntMessageCombiner.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/combiner/MinimumIntMessageCombiner.java
 
b/giraph-core/src/main/java/org/apache/giraph/combiner/MinimumIntMessageCombiner.java
index df80b8f..542f4be 100644
--- 
a/giraph-core/src/main/java/org/apache/giraph/combiner/MinimumIntMessageCombiner.java
+++ 
b/giraph-core/src/main/java/org/apache/giraph/combiner/MinimumIntMessageCombiner.java
@@ -25,7 +25,7 @@ import org.apache.hadoop.io.WritableComparable;
  * {@link MessageCombiner} that finds the minimum {@link IntWritable}
  */
 public class MinimumIntMessageCombiner
-    extends MessageCombiner<WritableComparable, IntWritable> {
+    implements MessageCombiner<WritableComparable, IntWritable> {
   @Override
   public void combine(WritableComparable vertexIndex,
       IntWritable originalMessage, IntWritable messageToCombine) {

http://git-wip-us.apache.org/repos/asf/giraph/blob/d7e4bde1/giraph-core/src/main/java/org/apache/giraph/combiner/SimpleSumMessageCombiner.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/combiner/SimpleSumMessageCombiner.java
 
b/giraph-core/src/main/java/org/apache/giraph/combiner/SimpleSumMessageCombiner.java
index e3ae597..cd51bdd 100644
--- 
a/giraph-core/src/main/java/org/apache/giraph/combiner/SimpleSumMessageCombiner.java
+++ 
b/giraph-core/src/main/java/org/apache/giraph/combiner/SimpleSumMessageCombiner.java
@@ -23,9 +23,12 @@ import org.apache.hadoop.io.WritableComparable;
 
 /**
  * MessageCombiner which sums up {@link IntWritable} message values.
+ *
+ * Use SumMessageCombiner.INT instead.
  */
+@Deprecated
 public class SimpleSumMessageCombiner
-    extends MessageCombiner<WritableComparable, IntWritable> {
+    implements MessageCombiner<WritableComparable, IntWritable> {
 
   @Override
   public void combine(WritableComparable vertexIndex,

http://git-wip-us.apache.org/repos/asf/giraph/blob/d7e4bde1/giraph-core/src/main/java/org/apache/giraph/combiner/SumMessageCombiner.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/combiner/SumMessageCombiner.java 
b/giraph-core/src/main/java/org/apache/giraph/combiner/SumMessageCombiner.java
new file mode 100644
index 0000000..9395142
--- /dev/null
+++ 
b/giraph-core/src/main/java/org/apache/giraph/combiner/SumMessageCombiner.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.combiner;
+
+import org.apache.giraph.types.ops.DoubleTypeOps;
+import org.apache.giraph.types.ops.FloatTypeOps;
+import org.apache.giraph.types.ops.IntTypeOps;
+import org.apache.giraph.types.ops.LongTypeOps;
+import org.apache.giraph.types.ops.NumericTypeOps;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ * Message combiner which sums all messages.
+ *
+ * @param <M> Message type
+ */
+public class SumMessageCombiner<M extends Writable>
+  implements MessageCombiner<WritableComparable, M> {
+  /** DoubleWritable specialization */
+  public static final SumMessageCombiner<DoubleWritable> DOUBLE =
+      new SumMessageCombiner<>(DoubleTypeOps.INSTANCE);
+  /** DoubleWritable specialization */
+  public static final SumMessageCombiner<FloatWritable> FLOAT =
+      new SumMessageCombiner<>(FloatTypeOps.INSTANCE);
+  /** LongWritable specialization */
+  public static final SumMessageCombiner<LongWritable> LONG =
+      new SumMessageCombiner<>(LongTypeOps.INSTANCE);
+  /** IntWritable specialization */
+  public static final SumMessageCombiner<IntWritable> INT =
+      new SumMessageCombiner<>(IntTypeOps.INSTANCE);
+
+  /** Value type operations */
+  private final NumericTypeOps<M> typeOps;
+
+  /**
+   * Constructor
+   * @param typeOps Value type operations
+   */
+  public SumMessageCombiner(NumericTypeOps<M> typeOps) {
+    this.typeOps = typeOps;
+  }
+
+  @Override
+  public void combine(
+      WritableComparable vertexIndex, M originalMessage, M messageToCombine) {
+    typeOps.plusInto(originalMessage, messageToCombine);
+  }
+
+  @Override
+  public M createInitialMessage() {
+    return typeOps.createZero();
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/d7e4bde1/giraph-core/src/main/java/org/apache/giraph/reducers/impl/MaxPairReducer.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/reducers/impl/MaxPairReducer.java 
b/giraph-core/src/main/java/org/apache/giraph/reducers/impl/MaxPairReducer.java
new file mode 100644
index 0000000..8527004
--- /dev/null
+++ 
b/giraph-core/src/main/java/org/apache/giraph/reducers/impl/MaxPairReducer.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.reducers.impl;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.giraph.reducers.ReduceSameTypeOperation;
+import org.apache.giraph.types.ops.TypeOps;
+import org.apache.giraph.types.ops.TypeOpsUtils;
+import org.apache.giraph.writable.tuple.PairWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+
+/**
+ * Aggregating PairWritable<L, R>, by taking pair with largest second value.
+ *
+ * @param <L> Type of the left value
+ * @param <R> Type of the right value
+ */
+public class MaxPairReducer<L extends Writable, R extends WritableComparable>
+    extends ReduceSameTypeOperation<PairWritable<L, R>> {
+
+  /** Left value TypeOps */
+  private TypeOps<L> leftTypeOps;
+  /** Right value TypeOps */
+  private TypeOps<R> rightTypeOps;
+
+  /** Constructor used for deserialization only */
+  public MaxPairReducer() {
+  }
+
+  /**
+   * Constructor
+   * @param leftTypeOps Left value TypeOps
+   * @param rightTypeOps Right value TypeOps
+   */
+  public MaxPairReducer(TypeOps<L> leftTypeOps, TypeOps<R> rightTypeOps) {
+    this.leftTypeOps = leftTypeOps;
+    this.rightTypeOps = rightTypeOps;
+  }
+
+  @Override
+  public PairWritable<L, R> reduce(
+      PairWritable<L, R> curValue, PairWritable<L, R> valueToReduce) {
+    if (valueToReduce.getRight().compareTo(curValue.getRight()) > 0) {
+      leftTypeOps.set(curValue.getLeft(), valueToReduce.getLeft());
+      rightTypeOps.set(curValue.getRight(), valueToReduce.getRight());
+    }
+    return curValue;
+  }
+
+  @Override
+  public PairWritable<L, R> createInitialValue() {
+    return new PairWritable<L, R>(
+        leftTypeOps.create(), rightTypeOps.create());
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    TypeOpsUtils.writeTypeOps(leftTypeOps, out);
+    TypeOpsUtils.writeTypeOps(rightTypeOps, out);
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    leftTypeOps = TypeOpsUtils.readTypeOps(in);
+    rightTypeOps = TypeOpsUtils.readTypeOps(in);
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/d7e4bde1/giraph-core/src/test/java/org/apache/giraph/master/TestComputationCombinerTypes.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/test/java/org/apache/giraph/master/TestComputationCombinerTypes.java
 
b/giraph-core/src/test/java/org/apache/giraph/master/TestComputationCombinerTypes.java
index 194bb5e..996fee8 100644
--- 
a/giraph-core/src/test/java/org/apache/giraph/master/TestComputationCombinerTypes.java
+++ 
b/giraph-core/src/test/java/org/apache/giraph/master/TestComputationCombinerTypes.java
@@ -144,7 +144,7 @@ public class TestComputationCombinerTypes {
           IntWritable> { }
 
   private static class NoOpMessageCombiner<I extends WritableComparable,
-      M extends Writable> extends MessageCombiner<I, M> {
+      M extends Writable> implements MessageCombiner<I, M> {
     @Override
     public void combine(I vertexIndex, M originalMessage, M messageToCombine) {
     }

http://git-wip-us.apache.org/repos/asf/giraph/blob/d7e4bde1/giraph-core/src/test/java/org/apache/giraph/master/TestSwitchClasses.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/test/java/org/apache/giraph/master/TestSwitchClasses.java 
b/giraph-core/src/test/java/org/apache/giraph/master/TestSwitchClasses.java
index 8a034c2..d56c0fb 100644
--- a/giraph-core/src/test/java/org/apache/giraph/master/TestSwitchClasses.java
+++ b/giraph-core/src/test/java/org/apache/giraph/master/TestSwitchClasses.java
@@ -195,7 +195,7 @@ public class TestSwitchClasses {
   }
 
   public static class MinimumMessageCombiner
-      extends MessageCombiner<IntWritable,
+      implements MessageCombiner<IntWritable,
                   IntWritable> {
     @Override
     public void combine(IntWritable vertexIndex, IntWritable originalMessage,
@@ -211,7 +211,7 @@ public class TestSwitchClasses {
   }
 
   public static class SumMessageCombiner
-      extends MessageCombiner<IntWritable, IntWritable> {
+      implements MessageCombiner<IntWritable, IntWritable> {
     @Override
     public void combine(IntWritable vertexIndex, IntWritable originalMessage,
         IntWritable messageToCombine) {

http://git-wip-us.apache.org/repos/asf/giraph/blob/d7e4bde1/giraph-examples/src/test/java/org/apache/giraph/vertex/TestComputationTypes.java
----------------------------------------------------------------------
diff --git 
a/giraph-examples/src/test/java/org/apache/giraph/vertex/TestComputationTypes.java
 
b/giraph-examples/src/test/java/org/apache/giraph/vertex/TestComputationTypes.java
index 41726a9..ea1a74d 100644
--- 
a/giraph-examples/src/test/java/org/apache/giraph/vertex/TestComputationTypes.java
+++ 
b/giraph-examples/src/test/java/org/apache/giraph/vertex/TestComputationTypes.java
@@ -64,7 +64,7 @@ public class TestComputationTypes {
      * Matches the {@link GeneratedComputationMatch}
      */
     public static class GeneratedVertexMatchMessageCombiner
-        extends
+        implements
         MessageCombiner<LongWritable, FloatWritable> {
       @Override
       public void combine(LongWritable vertexIndex,
@@ -82,7 +82,7 @@ public class TestComputationTypes {
      * Mismatches the {@link GeneratedComputationMatch}
      */
     public static class GeneratedVertexMismatchMessageCombiner
-        extends
+        implements
         MessageCombiner<LongWritable, DoubleWritable> {
       @Override
       public void combine(LongWritable vertexIndex,

http://git-wip-us.apache.org/repos/asf/giraph/blob/d7e4bde1/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index ceca219..392b92c 100644
--- a/pom.xml
+++ b/pom.xml
@@ -270,6 +270,8 @@ under the License.
     <project.enforcer.skip>false</project.enforcer.skip>
     <project.enforcer.fail>true</project.enforcer.fail>
     <project.build.targetJdk>1.7</project.build.targetJdk>
+    <project.build.javaHome>${env.JAVA_HOME}</project.build.javaHome>
+    <project.enforcer.minJdk>1.7</project.enforcer.minJdk>
     
<giraph.maven.dependency.plugin.skip>false</giraph.maven.dependency.plugin.skip>
     
<giraph.maven.duplicate.finder.skip>false</giraph.maven.duplicate.finder.skip>
     <!-- This lets modules skip unit tests. More details: GIRAPH-957 --> 
@@ -567,7 +569,9 @@ under the License.
           <configuration>
             <source>${project.build.targetJdk}</source>
             <target>${project.build.targetJdk}</target>
-          </configuration>
+            <executable>${project.build.javaHome}/bin/javac</executable>
+            <fork>true</fork> 
+         </configuration>
         </plugin>
         <plugin>
           <groupId>org.apache.maven.plugins</groupId>
@@ -626,7 +630,7 @@ under the License.
                 <version>${project.maven.version}</version>
               </requireMavenVersion>
               <requireJavaVersion>
-                <version>${project.build.targetJdk}</version>
+                <version>${project.enforcer.minJdk}</version>
               </requireJavaVersion>
             </rules>
           </configuration>
@@ -665,6 +669,9 @@ under the License.
           <groupId>org.apache.maven.plugins</groupId>
           <artifactId>maven-javadoc-plugin</artifactId>
           <version>2.9</version>
+          <configuration>
+            
<javadocExecutable>${project.build.javaHome}/bin/javadoc</javadocExecutable>
+          </configuration>
         </plugin>
         <plugin>
           <groupId>org.apache.maven.plugins</groupId>
@@ -774,6 +781,15 @@ under the License.
             </execution>
           </executions>
         </plugin>
+
+        <plugin>
+          <groupId>org.apache.maven.plugins</groupId>
+          <artifactId>maven-surefire-plugin</artifactId>
+          <configuration>
+            <jvm>${project.build.javaHome}/bin/java</jvm>
+            <forkMode>once</forkMode>
+          </configuration>
+        </plugin>
       </plugins>
     </pluginManagement>
     <plugins>
@@ -1061,6 +1077,9 @@ under the License.
 
     <profile>
       <id>hadoop_facebook</id>
+      <modules>
+        <module>giraph-block-app-8</module>
+      </modules>
       <properties>
         <hadoop.version>0.20.0</hadoop.version>
         
<munge.symbols>HADOOP_NON_SECURE,HADOOP_NON_JOBCONTEXT_IS_INTERFACE,HADOOP_JOB_ID_AVAILABLE,STATIC_SASL_SYMBOL</munge.symbols>
@@ -1719,6 +1738,11 @@ under the License.
       </dependency>
       <dependency>
         <groupId>org.apache.giraph</groupId>
+        <artifactId>giraph-block-app-8</artifactId>
+        <version>${project.version}</version>
+      </dependency>
+      <dependency>
+        <groupId>org.apache.giraph</groupId>
         <artifactId>giraph-examples</artifactId>
         <version>${project.version}</version>
       </dependency>

Reply via email to