Updated Branches:
  refs/heads/trunk 621a022c0 -> 96fd05385

GIRAPH-543: Fix PageRankBenchmark and make WeightedPageRankBenchmark 
(majakabiljo)


Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/96fd0538
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/96fd0538
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/96fd0538

Branch: refs/heads/trunk
Commit: 96fd05385199460bb731270e8dc42a5408175ab9
Parents: 621a022
Author: Maja Kabiljo <[email protected]>
Authored: Wed Apr 10 15:08:32 2013 -0700
Committer: Maja Kabiljo <[email protected]>
Committed: Wed Apr 10 15:11:15 2013 -0700

----------------------------------------------------------------------
 CHANGELOG                                          |    2 +
 .../apache/giraph/benchmark/BenchmarkOption.java   |   28 +++
 .../apache/giraph/benchmark/PageRankBenchmark.java |  145 ++-----------
 .../apache/giraph/benchmark/PageRankVertex.java    |   24 ++-
 .../benchmark/WeightedPageRankBenchmark.java       |  165 +++++++++++++++
 .../giraph/benchmark/WeightedPageRankVertex.java   |   66 ++++++
 .../apache/giraph/combiner/FloatSumCombiner.java   |   39 ++++
 .../org/apache/giraph/edge/IntNullArrayEdges.java  |  150 +++++++++++++
 .../io/formats/PseudoRandomEdgeInputFormat.java    |    9 +-
 .../PseudoRandomIntNullLocalEdgesHelper.java       |   81 +++++++
 .../PseudoRandomIntNullVertexInputFormat.java      |  153 +++++++++++++
 .../giraph/io/formats/PseudoRandomUtils.java       |   49 +++++
 .../io/formats/PseudoRandomVertexInputFormat.java  |    9 +-
 .../org/apache/giraph/io/TestJsonBase64Format.java |   14 +-
 14 files changed, 778 insertions(+), 156 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/giraph/blob/96fd0538/CHANGELOG
----------------------------------------------------------------------
diff --git a/CHANGELOG b/CHANGELOG
index d9c88ec..babbb88 100644
--- a/CHANGELOG
+++ b/CHANGELOG
@@ -1,6 +1,8 @@
 Giraph Change Log
 
 Release 0.2.0 - unreleased
+  GIRAPH-543: Fix PageRankBenchmark and make WeightedPageRankBenchmark 
(majakabiljo)
+
   GIRAPH-615: Add support for multithreaded output (majakabiljo)
 
   GIRAPH-612: Improve website for upcoming release (aching)

http://git-wip-us.apache.org/repos/asf/giraph/blob/96fd0538/giraph-core/src/main/java/org/apache/giraph/benchmark/BenchmarkOption.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/benchmark/BenchmarkOption.java 
b/giraph-core/src/main/java/org/apache/giraph/benchmark/BenchmarkOption.java
index 0771ca2..23c614b 100644
--- a/giraph-core/src/main/java/org/apache/giraph/benchmark/BenchmarkOption.java
+++ b/giraph-core/src/main/java/org/apache/giraph/benchmark/BenchmarkOption.java
@@ -50,6 +50,11 @@ public class BenchmarkOption {
       new BenchmarkOption("e", "edgesPerVertex", true,
           "Edges per vertex",
           "Need to set the number of edges per vertex (-e)");
+  /** Option for minimum ratio of partition-local edges */
+  public static final BenchmarkOption LOCAL_EDGES_MIN_RATIO =
+      new BenchmarkOption(
+          "l", "localEdgesMinRatio", true,
+          "Minimum ratio of partition-local edges (default is 0)");
 
   /** Short option */
   private String shortOption;
@@ -187,4 +192,27 @@ public class BenchmarkOption {
   public long getOptionLongValue(CommandLine cmd) {
     return Long.parseLong(getOptionValue(cmd));
   }
+
+  /**
+   * Retrieve the argument of this option as float value
+   *
+   * @param cmd CommandLine
+   * @return Value of the argument as float value
+   */
+  public float getOptionFloatValue(CommandLine cmd) {
+    return Float.parseFloat(getOptionValue(cmd));
+  }
+
+  /**
+   * Retrieve the argument of this option as float value,
+   * or default value if option is not set
+   *
+   * @param cmd CommandLine
+   * @param defaultValue Default value
+   * @return Value of the argument as float value,
+   * or default value if option is not set
+   */
+  public float getOptionFloatValue(CommandLine cmd, float defaultValue) {
+    return optionTurnedOn(cmd) ? getOptionFloatValue(cmd) : defaultValue;
+  }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/96fd0538/giraph-core/src/main/java/org/apache/giraph/benchmark/PageRankBenchmark.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/benchmark/PageRankBenchmark.java 
b/giraph-core/src/main/java/org/apache/giraph/benchmark/PageRankBenchmark.java
index fbb2516..77fa83c 100644
--- 
a/giraph-core/src/main/java/org/apache/giraph/benchmark/PageRankBenchmark.java
+++ 
b/giraph-core/src/main/java/org/apache/giraph/benchmark/PageRankBenchmark.java
@@ -15,149 +15,50 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.giraph.benchmark;
 
 import org.apache.commons.cli.CommandLine;
+import org.apache.giraph.combiner.FloatSumCombiner;
 import org.apache.giraph.conf.GiraphConfiguration;
-import org.apache.giraph.conf.GiraphConstants;
-import org.apache.giraph.combiner.DoubleSumCombiner;
-import org.apache.giraph.edge.ArrayListEdges;
-import org.apache.giraph.edge.ByteArrayEdges;
-import org.apache.giraph.edge.HashMapEdges;
-import org.apache.giraph.edge.LongDoubleArrayEdges;
+import org.apache.giraph.edge.IntNullArrayEdges;
 import org.apache.giraph.io.formats.PseudoRandomInputFormatConstants;
-import org.apache.giraph.io.formats.JsonBase64VertexOutputFormat;
-import org.apache.giraph.io.formats.PseudoRandomEdgeInputFormat;
-import org.apache.giraph.io.formats.PseudoRandomVertexInputFormat;
-import org.apache.giraph.partition.SimpleLongRangePartitionerFactory;
+import org.apache.giraph.io.formats.PseudoRandomIntNullVertexInputFormat;
 import org.apache.hadoop.util.ToolRunner;
-import org.apache.log4j.Logger;
 
 import com.google.common.collect.Sets;
 
 import java.util.Set;
 
 /**
- * Default Pregel-style PageRank computation.
+ * Benchmark for {@link PageRankVertex}
  */
 public class PageRankBenchmark extends GiraphBenchmark {
-  /** Class logger */
-  private static final Logger LOG = Logger.getLogger(PageRankBenchmark.class);
-
-  /** Option for VertexEdges class */
-  private static final BenchmarkOption EDGES_CLASS = new BenchmarkOption(
-      "c", "edgesClass", true,
-      "Vertex edges class (0 for LongDoubleArrayEdges," +
-          "1 for ByteArrayEdges, " +
-          "2 for ByteArrayEdges with unsafe serialization, " +
-          "3 for ArrayListEdges, " +
-          "4 for HashMapVertex");
-  /** Option for using edge input */
-  private static final BenchmarkOption EDGE_INPUT = new BenchmarkOption(
-      "ei", "edgeInput", false,
-      "Use edge-based input instead of vertex-based input.");
-  /** Option for minimum ratio of partition-local edges */
-  private static final BenchmarkOption LOCAL_EDGES_MIN_RATIO =
-      new BenchmarkOption(
-          "l", "localEdgesMinRatio", true,
-          "Minimum ratio of partition-local edges (default is 0)");
-  /** Option for partitioning algorithm */
-  private static final BenchmarkOption PARTITIONER = new BenchmarkOption(
-      "p", "partitioner", true,
-      "Partitioning algorithm (0 for hash partitioning (default), " +
-          "1 for range partitioning)");
-  /** Option for type of combiner */
-  private static final BenchmarkOption COMBINER_TYPE = new BenchmarkOption(
-      "t", "combinerType", true,
-      "Combiner type (0 for no combiner, 1 for DoubleSumCombiner (default)");
-  /** Option for output format */
-  private static final BenchmarkOption OUTPUT_FORMAT = new BenchmarkOption(
-      "o", "vertexOutputFormat", true,
-      "0 for JsonBase64VertexOutputFormat");
-
   @Override
   public Set<BenchmarkOption> getBenchmarkOptions() {
-    return Sets.newHashSet(
-        BenchmarkOption.SUPERSTEPS, BenchmarkOption.VERTICES,
-        BenchmarkOption.EDGES_PER_VERTEX, EDGES_CLASS, EDGE_INPUT,
-        LOCAL_EDGES_MIN_RATIO, PARTITIONER, COMBINER_TYPE, OUTPUT_FORMAT);
+    return Sets.newHashSet(BenchmarkOption.VERTICES,
+        BenchmarkOption.EDGES_PER_VERTEX, BenchmarkOption.SUPERSTEPS,
+        BenchmarkOption.LOCAL_EDGES_MIN_RATIO);
   }
 
-  /**
-   * Set vertex edges, input format, partitioner classes and related parameters
-   * based on command-line arguments.
-   *
-   * @param cmd           Command line arguments
-   * @param configuration Giraph job configuration
-   */
-  protected void prepareConfiguration(GiraphConfiguration configuration,
+  @Override
+  protected void prepareConfiguration(GiraphConfiguration conf,
       CommandLine cmd) {
-    configuration.setVertexClass(PageRankVertex.class);
-    int edgesClassOption = EDGES_CLASS.getOptionIntValue(cmd, 1);
-    switch (edgesClassOption) {
-    case 0:
-      configuration.setVertexEdgesClass(LongDoubleArrayEdges.class);
-      break;
-    case 1:
-      configuration.setVertexEdgesClass(ByteArrayEdges.class);
-      break;
-    case 2:
-      configuration.setVertexEdgesClass(ByteArrayEdges.class);
-      configuration.useUnsafeSerialization(true);
-      break;
-    case 3:
-      configuration.setVertexEdgesClass(ArrayListEdges.class);
-      break;
-    case 4:
-      configuration.setVertexEdgesClass(HashMapEdges.class);
-      break;
-    default:
-      LOG.info("Unknown VertexEdges class, " +
-          "defaulting to LongDoubleArrayEdges");
-      configuration.setVertexEdgesClass(LongDoubleArrayEdges.class);
-    }
-
-    LOG.info("Using edges class " +
-        GiraphConstants.VERTEX_EDGES_CLASS.get(configuration));
-    if (COMBINER_TYPE.getOptionIntValue(cmd, 1) == 1) {
-      configuration.setVertexCombinerClass(DoubleSumCombiner.class);
-    }
-
-    if (EDGE_INPUT.optionTurnedOn(cmd)) {
-      configuration.setEdgeInputFormatClass(PseudoRandomEdgeInputFormat.class);
-    } else {
-      configuration.setVertexInputFormatClass(
-          PseudoRandomVertexInputFormat.class);
-    }
-
-    configuration.setLong(
-        PseudoRandomInputFormatConstants.AGGREGATE_VERTICES,
-        BenchmarkOption.VERTICES.getOptionLongValue(cmd));
-    configuration.setLong(
-        PseudoRandomInputFormatConstants.EDGES_PER_VERTEX,
-        BenchmarkOption.EDGES_PER_VERTEX.getOptionLongValue(cmd));
-    if (LOCAL_EDGES_MIN_RATIO.optionTurnedOn(cmd)) {
-      float localEdgesMinRatio =
-          Float.parseFloat(LOCAL_EDGES_MIN_RATIO.getOptionValue(cmd));
-      configuration.setFloat(
-          PseudoRandomInputFormatConstants.LOCAL_EDGES_MIN_RATIO,
-          localEdgesMinRatio);
-    }
-
-    if (OUTPUT_FORMAT.getOptionIntValue(cmd, -1) == 0) {
-      LOG.info("Using vertex output format class " +
-          JsonBase64VertexOutputFormat.class.getName());
-      configuration.setVertexOutputFormatClass(
-          JsonBase64VertexOutputFormat.class);
-    }
-
-    if (PARTITIONER.getOptionIntValue(cmd, 0) == 1) {
-      configuration.setGraphPartitionerFactoryClass(
-          SimpleLongRangePartitionerFactory.class);
-    }
+    conf.setVertexClass(PageRankVertex.class);
+    conf.setVertexEdgesClass(IntNullArrayEdges.class);
+    conf.setVertexCombinerClass(FloatSumCombiner.class);
+    conf.setVertexInputFormatClass(
+        PseudoRandomIntNullVertexInputFormat.class);
 
-    configuration.setInt(PageRankVertex.SUPERSTEP_COUNT,
+    conf.setInt(PseudoRandomInputFormatConstants.AGGREGATE_VERTICES,
+        BenchmarkOption.VERTICES.getOptionIntValue(cmd));
+    conf.setInt(PseudoRandomInputFormatConstants.EDGES_PER_VERTEX,
+        BenchmarkOption.EDGES_PER_VERTEX.getOptionIntValue(cmd));
+    conf.setInt(PageRankVertex.SUPERSTEP_COUNT,
         BenchmarkOption.SUPERSTEPS.getOptionIntValue(cmd));
+    conf.setFloat(PseudoRandomInputFormatConstants.LOCAL_EDGES_MIN_RATIO,
+        BenchmarkOption.LOCAL_EDGES_MIN_RATIO.getOptionFloatValue(cmd,
+            PseudoRandomInputFormatConstants.LOCAL_EDGES_MIN_RATIO_DEFAULT));
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/giraph/blob/96fd0538/giraph-core/src/main/java/org/apache/giraph/benchmark/PageRankVertex.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/benchmark/PageRankVertex.java 
b/giraph-core/src/main/java/org/apache/giraph/benchmark/PageRankVertex.java
index 3f394b4..9900a44 100644
--- a/giraph-core/src/main/java/org/apache/giraph/benchmark/PageRankVertex.java
+++ b/giraph-core/src/main/java/org/apache/giraph/benchmark/PageRankVertex.java
@@ -19,33 +19,35 @@
 package org.apache.giraph.benchmark;
 
 import org.apache.giraph.graph.Vertex;
-import org.apache.hadoop.io.DoubleWritable;
-import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.NullWritable;
 
 import java.io.IOException;
 
 /**
- * PageRank algorithm.
+ * Implementation of PageRank in which vertex ids are ints, page rank values
+ * are floats, and graph is unweighted.
  */
-public class PageRankVertex extends Vertex<LongWritable, DoubleWritable,
-    DoubleWritable, DoubleWritable> {
+public class PageRankVertex extends Vertex<IntWritable, FloatWritable,
+    NullWritable, FloatWritable> {
   /** Number of supersteps */
   public static final String SUPERSTEP_COUNT =
-      "giraph.pageRankBenchmark.superstepCount";
+      "giraph.pageRank.superstepCount";
 
   @Override
-  public void compute(Iterable<DoubleWritable> messages) throws IOException {
+  public void compute(Iterable<FloatWritable> messages) throws IOException {
     if (getSuperstep() >= 1) {
-      double sum = 0;
-      for (DoubleWritable message : messages) {
+      float sum = 0;
+      for (FloatWritable message : messages) {
         sum += message.get();
       }
       getValue().set((0.15f / getTotalNumVertices()) + 0.85f * sum);
     }
 
     if (getSuperstep() < getConf().getInt(SUPERSTEP_COUNT, 0)) {
-      long edges = getNumEdges();
-      sendMessageToAllEdges(new DoubleWritable(getValue().get() / edges));
+      sendMessageToAllEdges(
+          new FloatWritable(getValue().get() / getNumEdges()));
     } else {
       voteToHalt();
     }

http://git-wip-us.apache.org/repos/asf/giraph/blob/96fd0538/giraph-core/src/main/java/org/apache/giraph/benchmark/WeightedPageRankBenchmark.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/benchmark/WeightedPageRankBenchmark.java
 
b/giraph-core/src/main/java/org/apache/giraph/benchmark/WeightedPageRankBenchmark.java
new file mode 100644
index 0000000..4c76996
--- /dev/null
+++ 
b/giraph-core/src/main/java/org/apache/giraph/benchmark/WeightedPageRankBenchmark.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.benchmark;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.giraph.conf.GiraphConfiguration;
+import org.apache.giraph.conf.GiraphConstants;
+import org.apache.giraph.combiner.DoubleSumCombiner;
+import org.apache.giraph.edge.ArrayListEdges;
+import org.apache.giraph.edge.ByteArrayEdges;
+import org.apache.giraph.edge.HashMapEdges;
+import org.apache.giraph.edge.LongDoubleArrayEdges;
+import org.apache.giraph.io.formats.PseudoRandomInputFormatConstants;
+import org.apache.giraph.io.formats.JsonBase64VertexOutputFormat;
+import org.apache.giraph.io.formats.PseudoRandomEdgeInputFormat;
+import org.apache.giraph.io.formats.PseudoRandomVertexInputFormat;
+import org.apache.giraph.partition.SimpleLongRangePartitionerFactory;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.log4j.Logger;
+
+import com.google.common.collect.Sets;
+
+import java.util.Set;
+
+/**
+ * Benchmark for {@link WeightedPageRankVertex}
+ */
+public class WeightedPageRankBenchmark extends GiraphBenchmark {
+  /** Class logger */
+  private static final Logger LOG =
+      Logger.getLogger(WeightedPageRankBenchmark.class);
+
+  /** Option for VertexEdges class */
+  private static final BenchmarkOption EDGES_CLASS = new BenchmarkOption(
+      "c", "edgesClass", true,
+      "Vertex edges class (0 for LongDoubleArrayEdges," +
+          "1 for ByteArrayEdges, " +
+          "2 for ByteArrayEdges with unsafe serialization, " +
+          "3 for ArrayListEdges, " +
+          "4 for HashMapVertex");
+  /** Option for using edge input */
+  private static final BenchmarkOption EDGE_INPUT = new BenchmarkOption(
+      "ei", "edgeInput", false,
+      "Use edge-based input instead of vertex-based input.");
+  /** Option for partitioning algorithm */
+  private static final BenchmarkOption PARTITIONER = new BenchmarkOption(
+      "p", "partitioner", true,
+      "Partitioning algorithm (0 for hash partitioning (default), " +
+          "1 for range partitioning)");
+  /** Option for type of combiner */
+  private static final BenchmarkOption COMBINER_TYPE = new BenchmarkOption(
+      "t", "combinerType", true,
+      "Combiner type (0 for no combiner, 1 for DoubleSumCombiner (default)");
+  /** Option for output format */
+  private static final BenchmarkOption OUTPUT_FORMAT = new BenchmarkOption(
+      "o", "vertexOutputFormat", true,
+      "0 for JsonBase64VertexOutputFormat");
+
+  @Override
+  public Set<BenchmarkOption> getBenchmarkOptions() {
+    return Sets.newHashSet(
+        BenchmarkOption.SUPERSTEPS, BenchmarkOption.VERTICES,
+        BenchmarkOption.EDGES_PER_VERTEX, 
BenchmarkOption.LOCAL_EDGES_MIN_RATIO,
+        EDGES_CLASS, EDGE_INPUT, PARTITIONER, COMBINER_TYPE, OUTPUT_FORMAT);
+  }
+
+  /**
+   * Set vertex edges, input format, partitioner classes and related parameters
+   * based on command-line arguments.
+   *
+   * @param cmd           Command line arguments
+   * @param configuration Giraph job configuration
+   */
+  protected void prepareConfiguration(GiraphConfiguration configuration,
+      CommandLine cmd) {
+    configuration.setVertexClass(WeightedPageRankVertex.class);
+    int edgesClassOption = EDGES_CLASS.getOptionIntValue(cmd, 1);
+    switch (edgesClassOption) {
+    case 0:
+      configuration.setVertexEdgesClass(LongDoubleArrayEdges.class);
+      break;
+    case 1:
+      configuration.setVertexEdgesClass(ByteArrayEdges.class);
+      break;
+    case 2:
+      configuration.setVertexEdgesClass(ByteArrayEdges.class);
+      configuration.useUnsafeSerialization(true);
+      break;
+    case 3:
+      configuration.setVertexEdgesClass(ArrayListEdges.class);
+      break;
+    case 4:
+      configuration.setVertexEdgesClass(HashMapEdges.class);
+      break;
+    default:
+      LOG.info("Unknown VertexEdges class, " +
+          "defaulting to LongDoubleArrayEdges");
+      configuration.setVertexEdgesClass(LongDoubleArrayEdges.class);
+    }
+
+    LOG.info("Using edges class " +
+        GiraphConstants.VERTEX_EDGES_CLASS.get(configuration));
+    if (COMBINER_TYPE.getOptionIntValue(cmd, 1) == 1) {
+      configuration.setVertexCombinerClass(DoubleSumCombiner.class);
+    }
+
+    if (EDGE_INPUT.optionTurnedOn(cmd)) {
+      configuration.setEdgeInputFormatClass(PseudoRandomEdgeInputFormat.class);
+    } else {
+      configuration.setVertexInputFormatClass(
+          PseudoRandomVertexInputFormat.class);
+    }
+
+    configuration.setLong(
+        PseudoRandomInputFormatConstants.AGGREGATE_VERTICES,
+        BenchmarkOption.VERTICES.getOptionLongValue(cmd));
+    configuration.setLong(
+        PseudoRandomInputFormatConstants.EDGES_PER_VERTEX,
+        BenchmarkOption.EDGES_PER_VERTEX.getOptionLongValue(cmd));
+    configuration.setFloat(
+        PseudoRandomInputFormatConstants.LOCAL_EDGES_MIN_RATIO,
+        BenchmarkOption.LOCAL_EDGES_MIN_RATIO.getOptionFloatValue(cmd,
+            PseudoRandomInputFormatConstants.LOCAL_EDGES_MIN_RATIO_DEFAULT));
+
+    if (OUTPUT_FORMAT.getOptionIntValue(cmd, -1) == 0) {
+      LOG.info("Using vertex output format class " +
+          JsonBase64VertexOutputFormat.class.getName());
+      configuration.setVertexOutputFormatClass(
+          JsonBase64VertexOutputFormat.class);
+    }
+
+    if (PARTITIONER.getOptionIntValue(cmd, 0) == 1) {
+      configuration.setGraphPartitionerFactoryClass(
+          SimpleLongRangePartitionerFactory.class);
+    }
+
+    configuration.setInt(WeightedPageRankVertex.SUPERSTEP_COUNT,
+        BenchmarkOption.SUPERSTEPS.getOptionIntValue(cmd));
+  }
+
+  /**
+   * Execute the benchmark.
+   *
+   * @param args Typically the command line arguments.
+   * @throws Exception Any exception from the computation.
+   */
+  public static void main(final String[] args) throws Exception {
+    System.exit(ToolRunner.run(new WeightedPageRankBenchmark(), args));
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/96fd0538/giraph-core/src/main/java/org/apache/giraph/benchmark/WeightedPageRankVertex.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/benchmark/WeightedPageRankVertex.java
 
b/giraph-core/src/main/java/org/apache/giraph/benchmark/WeightedPageRankVertex.java
new file mode 100644
index 0000000..70f0f61
--- /dev/null
+++ 
b/giraph-core/src/main/java/org/apache/giraph/benchmark/WeightedPageRankVertex.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.benchmark;
+
+import org.apache.giraph.edge.Edge;
+import org.apache.giraph.edge.MutableEdge;
+import org.apache.giraph.graph.Vertex;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.LongWritable;
+
+import java.io.IOException;
+
+/**
+ * Implementation of Page Rank algorithm on a weighted graph.
+ */
+public class WeightedPageRankVertex extends Vertex<LongWritable, 
DoubleWritable,
+    DoubleWritable, DoubleWritable> {
+  /** Number of supersteps */
+  public static final String SUPERSTEP_COUNT =
+      "giraph.weightedPageRank.superstepCount";
+
+  @Override
+  public void compute(Iterable<DoubleWritable> messages) throws IOException {
+    if (getSuperstep() == 0) {
+      // Normalize out edge weights
+      double outEdgeSum = 0;
+      for (Edge<LongWritable, DoubleWritable> edge : getEdges()) {
+        outEdgeSum += edge.getValue().get();
+      }
+      for (MutableEdge<LongWritable, DoubleWritable> edge : getMutableEdges()) 
{
+        edge.setValue(new DoubleWritable(edge.getValue().get() / outEdgeSum));
+      }
+    } else {
+      double messageSum = 0;
+      for (DoubleWritable message : messages) {
+        messageSum += message.get();
+      }
+      getValue().set((0.15f / getTotalNumVertices()) + 0.85f * messageSum);
+    }
+
+    if (getSuperstep() < getConf().getInt(SUPERSTEP_COUNT, 0)) {
+      for (Edge<LongWritable, DoubleWritable> edge : getEdges()) {
+        sendMessage(edge.getTargetVertexId(),
+            new DoubleWritable(getValue().get() * edge.getValue().get()));
+      }
+    } else {
+      voteToHalt();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/96fd0538/giraph-core/src/main/java/org/apache/giraph/combiner/FloatSumCombiner.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/combiner/FloatSumCombiner.java 
b/giraph-core/src/main/java/org/apache/giraph/combiner/FloatSumCombiner.java
new file mode 100644
index 0000000..d898791
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/combiner/FloatSumCombiner.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.combiner;
+
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.IntWritable;
+
+/**
+ * A combiner that sums float-valued messages
+ */
+public class FloatSumCombiner extends
+    Combiner<IntWritable, FloatWritable> {
+  @Override
+  public void combine(IntWritable vertexIndex, FloatWritable originalMessage,
+      FloatWritable messageToCombine) {
+    originalMessage.set(originalMessage.get() + messageToCombine.get());
+  }
+
+  @Override
+  public FloatWritable createInitialMessage() {
+    return new FloatWritable(0);
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/96fd0538/giraph-core/src/main/java/org/apache/giraph/edge/IntNullArrayEdges.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/edge/IntNullArrayEdges.java 
b/giraph-core/src/main/java/org/apache/giraph/edge/IntNullArrayEdges.java
new file mode 100644
index 0000000..2363caf
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/edge/IntNullArrayEdges.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.edge;
+
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.NullWritable;
+
+import com.google.common.collect.UnmodifiableIterator;
+
+import it.unimi.dsi.fastutil.ints.IntArrayList;
+import it.unimi.dsi.fastutil.ints.IntIterator;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Iterator;
+
+/**
+ * Implementation of {@link VertexEdges} with int ids and null edge
+ * values, backed by dynamic primitive array.
+ * Parallel edges are allowed.
+ * Note: this implementation is optimized for space usage,
+ * but edge removals are expensive.
+ */
+public class IntNullArrayEdges
+    implements ReuseObjectsVertexEdges<IntWritable, NullWritable> {
+  /** Array of target vertex ids */
+  private IntArrayList neighbors;
+
+  @Override
+  public void initialize(Iterable<Edge<IntWritable, NullWritable>> edges) {
+    // If the iterable is actually a collection, we can cheaply get the
+    // size and initialize the array with the expected capacity.
+    if (edges instanceof Collection) {
+      int numEdges =
+          ((Collection<Edge<IntWritable, NullWritable>>) edges).size();
+      initialize(numEdges);
+    } else {
+      initialize();
+    }
+    for (Edge<IntWritable, NullWritable> edge : edges) {
+      add(edge);
+    }
+  }
+
+  @Override
+  public void initialize(int capacity) {
+    neighbors = new IntArrayList(capacity);
+  }
+
+  @Override
+  public void initialize() {
+    neighbors = new IntArrayList();
+  }
+
+  @Override
+  public int size() {
+    return neighbors.size();
+  }
+
+  @Override
+  public void add(Edge<IntWritable, NullWritable> edge) {
+    neighbors.add(edge.getTargetVertexId().get());
+  }
+
+  /**
+   * Remove edge at position i.
+   *
+   * @param i Position of edge to be removed
+   */
+  private void removeAt(int i) {
+    // The order of the edges is irrelevant, so we can simply replace
+    // the deleted edge with the rightmost element, thus achieving constant
+    // time.
+    if (i == neighbors.size() - 1) {
+      neighbors.popInt();
+    } else {
+      neighbors.set(i, neighbors.popInt());
+    }
+  }
+
+  @Override
+  public void remove(IntWritable targetVertexId) {
+    // Thanks to the constant-time implementation of removeAt(int),
+    // we can remove all matching edges in linear time.
+    for (int i = neighbors.size() - 1; i >= 0; --i) {
+      if (neighbors.get(i) == targetVertexId.get()) {
+        removeAt(i);
+      }
+    }
+  }
+
+  @Override
+  public Iterator<Edge<IntWritable, NullWritable>> iterator() {
+    // Returns an iterator that reuses objects.
+    return new UnmodifiableIterator<Edge<IntWritable, NullWritable>>() {
+      /** Wrapped neighbors iterator. */
+      private IntIterator neighborsIt = neighbors.iterator();
+      /** Representative edge object. */
+      private Edge<IntWritable, NullWritable> representativeEdge =
+          EdgeFactory.create(new IntWritable(), NullWritable.get());
+
+      @Override
+      public boolean hasNext() {
+        return neighborsIt.hasNext();
+      }
+
+      @Override
+      public Edge<IntWritable, NullWritable> next() {
+        representativeEdge.getTargetVertexId().set(neighborsIt.nextInt());
+        return representativeEdge;
+      }
+    };
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    out.writeInt(neighbors.size());
+    IntIterator iterator = neighbors.iterator();
+    while (iterator.hasNext()) {
+      out.writeInt(iterator.nextInt());
+    }
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    int numEdges = in.readInt();
+    initialize(numEdges);
+    for (int i = 0; i < numEdges; ++i) {
+      neighbors.add(in.readInt());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/96fd0538/giraph-core/src/main/java/org/apache/giraph/io/formats/PseudoRandomEdgeInputFormat.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/io/formats/PseudoRandomEdgeInputFormat.java
 
b/giraph-core/src/main/java/org/apache/giraph/io/formats/PseudoRandomEdgeInputFormat.java
index 87cadb4..2cc4dba 100644
--- 
a/giraph-core/src/main/java/org/apache/giraph/io/formats/PseudoRandomEdgeInputFormat.java
+++ 
b/giraph-core/src/main/java/org/apache/giraph/io/formats/PseudoRandomEdgeInputFormat.java
@@ -20,7 +20,6 @@ package org.apache.giraph.io.formats;
 
 import com.google.common.collect.Sets;
 import java.io.IOException;
-import java.util.ArrayList;
 import java.util.List;
 import java.util.Random;
 import java.util.Set;
@@ -47,13 +46,7 @@ public class PseudoRandomEdgeInputFormat
   public final List<InputSplit> getSplits(final JobContext context,
                                           final int minSplitCountHint)
     throws IOException, InterruptedException {
-    // This is meaningless, the PseudoRandomEdgeReader will generate
-    // all the test data
-    List<InputSplit> inputSplitList = new ArrayList<InputSplit>();
-    for (int i = 0; i < minSplitCountHint; ++i) {
-      inputSplitList.add(new BspInputSplit(i, minSplitCountHint));
-    }
-    return inputSplitList;
+    return PseudoRandomUtils.getSplits(minSplitCountHint);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/giraph/blob/96fd0538/giraph-core/src/main/java/org/apache/giraph/io/formats/PseudoRandomIntNullLocalEdgesHelper.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/io/formats/PseudoRandomIntNullLocalEdgesHelper.java
 
b/giraph-core/src/main/java/org/apache/giraph/io/formats/PseudoRandomIntNullLocalEdgesHelper.java
new file mode 100644
index 0000000..46997a8
--- /dev/null
+++ 
b/giraph-core/src/main/java/org/apache/giraph/io/formats/PseudoRandomIntNullLocalEdgesHelper.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.io.formats;
+
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.partition.PartitionUtils;
+import org.apache.giraph.worker.WorkerInfo;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Random;
+
+/**
+ * Helper class to generate pseudo-random local edges.
+ * Like {@link PseudoRandomLocalEdgesHelper}, but for graphs where vertex ids
+ * are integers.
+ */
+public class PseudoRandomIntNullLocalEdgesHelper {
+  /** Minimum ratio of partition-local edges. */
+  private float minLocalEdgesRatio;
+  /** Total number of vertices. */
+  private int numVertices;
+  /** Total number of partitions. */
+  private int numPartitions;
+  /** Average partition size. */
+  private int partitionSize;
+
+  /**
+   * Constructor.
+   *
+   * @param numVertices        Total number of vertices.
+   * @param conf               Configuration.
+   */
+  public PseudoRandomIntNullLocalEdgesHelper(int numVertices,
+      ImmutableClassesGiraphConfiguration conf) {
+    this.minLocalEdgesRatio = conf.getFloat(
+        PseudoRandomInputFormatConstants.LOCAL_EDGES_MIN_RATIO,
+        PseudoRandomInputFormatConstants.LOCAL_EDGES_MIN_RATIO_DEFAULT);
+    this.numVertices = numVertices;
+    int numWorkers = conf.getMaxWorkers();
+    List<WorkerInfo> workerInfos = Collections.nCopies(numWorkers,
+        new WorkerInfo());
+    numPartitions = PartitionUtils.computePartitionCount(workerInfos,
+        numWorkers, conf);
+    partitionSize = numVertices / numPartitions;
+  }
+
+  /**
+   * Generate a destination vertex id for the given source vertex,
+   * using the desired configuration for edge locality and the provided
+   * pseudo-random generator.
+   *
+   * @param sourceVertexId Source vertex id.
+   * @param rand           Pseudo-random generator.
+   * @return Destination vertex id.
+   */
+  public int generateDestVertex(int sourceVertexId, Random rand) {
+    if (rand.nextFloat() < minLocalEdgesRatio) {
+      int partitionId = sourceVertexId % numPartitions;
+      return partitionId + numPartitions * rand.nextInt(partitionSize);
+    } else {
+      return rand.nextInt(numVertices);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/96fd0538/giraph-core/src/main/java/org/apache/giraph/io/formats/PseudoRandomIntNullVertexInputFormat.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/io/formats/PseudoRandomIntNullVertexInputFormat.java
 
b/giraph-core/src/main/java/org/apache/giraph/io/formats/PseudoRandomIntNullVertexInputFormat.java
new file mode 100644
index 0000000..b27fcc8
--- /dev/null
+++ 
b/giraph-core/src/main/java/org/apache/giraph/io/formats/PseudoRandomIntNullVertexInputFormat.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.io.formats;
+
+import org.apache.giraph.bsp.BspInputSplit;
+import org.apache.giraph.edge.Edge;
+import org.apache.giraph.edge.ReusableEdge;
+import org.apache.giraph.edge.VertexEdges;
+import org.apache.giraph.graph.Vertex;
+import org.apache.giraph.io.VertexInputFormat;
+import org.apache.giraph.io.VertexReader;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import it.unimi.dsi.fastutil.ints.IntOpenHashSet;
+import it.unimi.dsi.fastutil.ints.IntSet;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Random;
+
+/**
+ * VertexInputFormat for large scale testing,
+ * like {@link PseudoRandomVertexInputFormat}, but for the unweighted graphs
+ * where vertex ids are integers.
+ */
+public class PseudoRandomIntNullVertexInputFormat extends
+    VertexInputFormat<IntWritable, FloatWritable, NullWritable> {
+  @Override
+  public final List<InputSplit> getSplits(final JobContext context,
+      final int minSplitCountHint) throws IOException, InterruptedException {
+    return PseudoRandomUtils.getSplits(minSplitCountHint);
+  }
+
+  @Override
+  public VertexReader<IntWritable, FloatWritable, NullWritable>
+  createVertexReader(InputSplit split,
+      TaskAttemptContext context) throws IOException {
+    return new PseudoRandomVertexReader();
+  }
+
+  /**
+   * Used by {@link PseudoRandomIntNullVertexInputFormat} to read
+   * pseudo-randomly generated data.
+   */
+  private static class PseudoRandomVertexReader extends
+      VertexReader<IntWritable, FloatWritable, NullWritable> {
+    /** Starting vertex id. */
+    private int startingVertexId = -1;
+    /** Vertices read so far. */
+    private int verticesRead = 0;
+    /** Total vertices to read (on this split alone). */
+    private int totalSplitVertices = -1;
+    /** Edges per vertex. */
+    private int edgesPerVertex = -1;
+    /** Reusable int set */
+    private final IntSet destVertices = new IntOpenHashSet();
+    /** Resuable edge object */
+    private ReusableEdge<IntWritable, NullWritable> reusableEdge = null;
+    /** Helper for generating pseudo-random local edges. */
+    private PseudoRandomIntNullLocalEdgesHelper localEdgesHelper;
+    /** Random */
+    private Random rand;
+
+    /** Default constructor for reflection. */
+    public PseudoRandomVertexReader() {
+    }
+
+    @Override
+    public void initialize(InputSplit inputSplit,
+        TaskAttemptContext context) throws IOException {
+      int aggregateVertices = getConf().getInt(
+          PseudoRandomInputFormatConstants.AGGREGATE_VERTICES, 0);
+      BspInputSplit bspInputSplit = (BspInputSplit) inputSplit;
+      int extraVertices = aggregateVertices % bspInputSplit.getNumSplits();
+      totalSplitVertices = aggregateVertices / bspInputSplit.getNumSplits();
+      if (bspInputSplit.getSplitIndex() < extraVertices) {
+        ++totalSplitVertices;
+      }
+      startingVertexId = bspInputSplit.getSplitIndex() *
+          (aggregateVertices / bspInputSplit.getNumSplits()) +
+          Math.min(bspInputSplit.getSplitIndex(), extraVertices);
+      edgesPerVertex = getConf().getInt(
+          PseudoRandomInputFormatConstants.EDGES_PER_VERTEX, 0);
+      rand = new Random(bspInputSplit.getSplitIndex());
+      if (getConf().reuseEdgeObjects()) {
+        reusableEdge = getConf().createReusableEdge();
+      }
+      localEdgesHelper = new PseudoRandomIntNullLocalEdgesHelper(
+          aggregateVertices, getConf());
+    }
+
+    @Override
+    public boolean nextVertex() throws IOException, InterruptedException {
+      return totalSplitVertices > verticesRead;
+    }
+
+    @Override
+    public Vertex<IntWritable, FloatWritable, NullWritable, ?>
+    getCurrentVertex() throws IOException, InterruptedException {
+      Vertex<IntWritable, FloatWritable, NullWritable, ?> vertex =
+          getConf().createVertex();
+      int vertexId = startingVertexId + verticesRead;
+      VertexEdges<IntWritable, NullWritable> edges =
+          getConf().createVertexEdges();
+      edges.initialize(edgesPerVertex);
+      destVertices.clear();
+      for (int i = 0; i < edgesPerVertex; ++i) {
+        int destVertexId;
+        do {
+          destVertexId = localEdgesHelper.generateDestVertex(vertexId, rand);
+        } while (!destVertices.add(destVertexId));
+        Edge<IntWritable, NullWritable> edge =
+            (reusableEdge == null) ? getConf().createEdge() : reusableEdge;
+        edge.getTargetVertexId().set(destVertexId);
+        edges.add(edge);
+      }
+      vertex.initialize(
+          new IntWritable(vertexId), new FloatWritable(1.0f), edges);
+      ++verticesRead;
+      return vertex;
+    }
+
+    @Override
+    public void close() throws IOException {
+    }
+
+    @Override
+    public float getProgress() throws IOException {
+      return verticesRead * 100.0f / totalSplitVertices;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/96fd0538/giraph-core/src/main/java/org/apache/giraph/io/formats/PseudoRandomUtils.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/io/formats/PseudoRandomUtils.java 
b/giraph-core/src/main/java/org/apache/giraph/io/formats/PseudoRandomUtils.java
new file mode 100644
index 0000000..6def976
--- /dev/null
+++ 
b/giraph-core/src/main/java/org/apache/giraph/io/formats/PseudoRandomUtils.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.io.formats;
+
+import org.apache.giraph.bsp.BspInputSplit;
+import org.apache.hadoop.mapreduce.InputSplit;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Utility methods for PseudoRandom input formats
+ */
+public class PseudoRandomUtils {
+  /** Do not instantiate */
+  private PseudoRandomUtils() { }
+
+  /**
+   * Create desired number of {@link BspInputSplit}s
+   *
+   * @param numSplits How many splits to create
+   * @return List of {@link BspInputSplit}s
+   */
+  public static List<InputSplit> getSplits(int numSplits) throws IOException,
+      InterruptedException {
+    List<InputSplit> inputSplitList = new ArrayList<InputSplit>();
+    for (int i = 0; i < numSplits; ++i) {
+      inputSplitList.add(new BspInputSplit(i, numSplits));
+    }
+    return inputSplitList;
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/96fd0538/giraph-core/src/main/java/org/apache/giraph/io/formats/PseudoRandomVertexInputFormat.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/io/formats/PseudoRandomVertexInputFormat.java
 
b/giraph-core/src/main/java/org/apache/giraph/io/formats/PseudoRandomVertexInputFormat.java
index dca0271..6703b22 100644
--- 
a/giraph-core/src/main/java/org/apache/giraph/io/formats/PseudoRandomVertexInputFormat.java
+++ 
b/giraph-core/src/main/java/org/apache/giraph/io/formats/PseudoRandomVertexInputFormat.java
@@ -20,7 +20,6 @@ package org.apache.giraph.io.formats;
 
 import com.google.common.collect.Sets;
 import java.io.IOException;
-import java.util.ArrayList;
 import java.util.List;
 import java.util.Random;
 import java.util.Set;
@@ -48,13 +47,7 @@ public class PseudoRandomVertexInputFormat extends
   @Override
   public final List<InputSplit> getSplits(final JobContext context,
       final int minSplitCountHint) throws IOException, InterruptedException {
-    // This is meaningless, the PseudoRandomVertexReader will generate
-    // all the test data
-    List<InputSplit> inputSplitList = new ArrayList<InputSplit>();
-    for (int i = 0; i < minSplitCountHint; ++i) {
-      inputSplitList.add(new BspInputSplit(i, minSplitCountHint));
-    }
-    return inputSplitList;
+    return PseudoRandomUtils.getSplits(minSplitCountHint);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/giraph/blob/96fd0538/giraph-core/src/test/java/org/apache/giraph/io/TestJsonBase64Format.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/test/java/org/apache/giraph/io/TestJsonBase64Format.java 
b/giraph-core/src/test/java/org/apache/giraph/io/TestJsonBase64Format.java
index 0117ce9..ae9441e 100644
--- a/giraph-core/src/test/java/org/apache/giraph/io/TestJsonBase64Format.java
+++ b/giraph-core/src/test/java/org/apache/giraph/io/TestJsonBase64Format.java
@@ -19,7 +19,7 @@
 package org.apache.giraph.io;
 
 import org.apache.giraph.BspCase;
-import org.apache.giraph.benchmark.PageRankVertex;
+import org.apache.giraph.benchmark.WeightedPageRankVertex;
 import org.apache.giraph.conf.GiraphClasses;
 import org.apache.giraph.io.formats.GiraphFileInputFormat;
 import org.apache.giraph.io.formats.JsonBase64VertexInputFormat;
@@ -62,7 +62,7 @@ public class TestJsonBase64Format extends BspCase {
 
     Path outputPath = getTempPath(getCallingMethodName());
     GiraphClasses classes = new GiraphClasses();
-    classes.setVertexClass(PageRankVertex.class);
+    classes.setVertexClass(WeightedPageRankVertex.class);
     classes.setVertexInputFormatClass(PseudoRandomVertexInputFormat.class);
     classes.setVertexOutputFormatClass(JsonBase64VertexOutputFormat.class);
     GiraphJob job = prepareJob(getCallingMethodName(), classes, outputPath);
@@ -70,24 +70,24 @@ public class TestJsonBase64Format extends BspCase {
         PseudoRandomInputFormatConstants.AGGREGATE_VERTICES, 101);
     job.getConfiguration().setLong(
         PseudoRandomInputFormatConstants.EDGES_PER_VERTEX, 2);
-    job.getConfiguration().setInt(PageRankVertex.SUPERSTEP_COUNT, 2);
+    job.getConfiguration().setInt(WeightedPageRankVertex.SUPERSTEP_COUNT, 2);
 
     assertTrue(job.run(true));
 
     Path outputPath2 = getTempPath(getCallingMethodName() + "2");
     classes = new GiraphClasses();
-    classes.setVertexClass(PageRankVertex.class);
+    classes.setVertexClass(WeightedPageRankVertex.class);
     classes.setVertexInputFormatClass(JsonBase64VertexInputFormat.class);
     classes.setVertexOutputFormatClass(JsonBase64VertexOutputFormat.class);
     job = prepareJob(getCallingMethodName(), classes, outputPath2);
-    job.getConfiguration().setInt(PageRankVertex.SUPERSTEP_COUNT, 3);
+    job.getConfiguration().setInt(WeightedPageRankVertex.SUPERSTEP_COUNT, 3);
     GiraphFileInputFormat.addVertexInputPath(
       job.getInternalJob().getConfiguration(), outputPath);
     assertTrue(job.run(true));
 
     Path outputPath3 = getTempPath(getCallingMethodName() + "3");
     classes = new GiraphClasses();
-    classes.setVertexClass(PageRankVertex.class);
+    classes.setVertexClass(WeightedPageRankVertex.class);
     classes.setVertexInputFormatClass(PseudoRandomVertexInputFormat.class);
     classes.setVertexOutputFormatClass(JsonBase64VertexOutputFormat.class);
     job = prepareJob(getCallingMethodName(), classes, outputPath3);
@@ -95,7 +95,7 @@ public class TestJsonBase64Format extends BspCase {
         PseudoRandomInputFormatConstants.AGGREGATE_VERTICES, 101);
     job.getConfiguration().setLong(
         PseudoRandomInputFormatConstants.EDGES_PER_VERTEX, 2);
-    job.getConfiguration().setInt(PageRankVertex.SUPERSTEP_COUNT, 5);
+    job.getConfiguration().setInt(WeightedPageRankVertex.SUPERSTEP_COUNT, 5);
     assertTrue(job.run(true));
 
     Configuration conf = job.getConfiguration();

Reply via email to