Updated Branches: refs/heads/trunk 621a022c0 -> 96fd05385
GIRAPH-543: Fix PageRankBenchmark and make WeightedPageRankBenchmark (majakabiljo) Project: http://git-wip-us.apache.org/repos/asf/giraph/repo Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/96fd0538 Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/96fd0538 Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/96fd0538 Branch: refs/heads/trunk Commit: 96fd05385199460bb731270e8dc42a5408175ab9 Parents: 621a022 Author: Maja Kabiljo <[email protected]> Authored: Wed Apr 10 15:08:32 2013 -0700 Committer: Maja Kabiljo <[email protected]> Committed: Wed Apr 10 15:11:15 2013 -0700 ---------------------------------------------------------------------- CHANGELOG | 2 + .../apache/giraph/benchmark/BenchmarkOption.java | 28 +++ .../apache/giraph/benchmark/PageRankBenchmark.java | 145 ++----------- .../apache/giraph/benchmark/PageRankVertex.java | 24 ++- .../benchmark/WeightedPageRankBenchmark.java | 165 +++++++++++++++ .../giraph/benchmark/WeightedPageRankVertex.java | 66 ++++++ .../apache/giraph/combiner/FloatSumCombiner.java | 39 ++++ .../org/apache/giraph/edge/IntNullArrayEdges.java | 150 +++++++++++++ .../io/formats/PseudoRandomEdgeInputFormat.java | 9 +- .../PseudoRandomIntNullLocalEdgesHelper.java | 81 +++++++ .../PseudoRandomIntNullVertexInputFormat.java | 153 +++++++++++++ .../giraph/io/formats/PseudoRandomUtils.java | 49 +++++ .../io/formats/PseudoRandomVertexInputFormat.java | 9 +- .../org/apache/giraph/io/TestJsonBase64Format.java | 14 +- 14 files changed, 778 insertions(+), 156 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/giraph/blob/96fd0538/CHANGELOG ---------------------------------------------------------------------- diff --git a/CHANGELOG b/CHANGELOG index d9c88ec..babbb88 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -1,6 +1,8 @@ Giraph Change Log Release 0.2.0 - unreleased + GIRAPH-543: Fix PageRankBenchmark and make WeightedPageRankBenchmark (majakabiljo) + GIRAPH-615: Add support for multithreaded output (majakabiljo) GIRAPH-612: Improve website for upcoming release (aching) http://git-wip-us.apache.org/repos/asf/giraph/blob/96fd0538/giraph-core/src/main/java/org/apache/giraph/benchmark/BenchmarkOption.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/benchmark/BenchmarkOption.java b/giraph-core/src/main/java/org/apache/giraph/benchmark/BenchmarkOption.java index 0771ca2..23c614b 100644 --- a/giraph-core/src/main/java/org/apache/giraph/benchmark/BenchmarkOption.java +++ b/giraph-core/src/main/java/org/apache/giraph/benchmark/BenchmarkOption.java @@ -50,6 +50,11 @@ public class BenchmarkOption { new BenchmarkOption("e", "edgesPerVertex", true, "Edges per vertex", "Need to set the number of edges per vertex (-e)"); + /** Option for minimum ratio of partition-local edges */ + public static final BenchmarkOption LOCAL_EDGES_MIN_RATIO = + new BenchmarkOption( + "l", "localEdgesMinRatio", true, + "Minimum ratio of partition-local edges (default is 0)"); /** Short option */ private String shortOption; @@ -187,4 +192,27 @@ public class BenchmarkOption { public long getOptionLongValue(CommandLine cmd) { return Long.parseLong(getOptionValue(cmd)); } + + /** + * Retrieve the argument of this option as float value + * + * @param cmd CommandLine + * @return Value of the argument as float value + */ + public float getOptionFloatValue(CommandLine cmd) { + return Float.parseFloat(getOptionValue(cmd)); + } + + /** + * Retrieve the argument of this option as float value, + * or default value if option is not set + * + * @param cmd CommandLine + * @param defaultValue Default value + * @return Value of the argument as float value, + * or default value if option is not set + */ + public float getOptionFloatValue(CommandLine cmd, float defaultValue) { + return optionTurnedOn(cmd) ? getOptionFloatValue(cmd) : defaultValue; + } } http://git-wip-us.apache.org/repos/asf/giraph/blob/96fd0538/giraph-core/src/main/java/org/apache/giraph/benchmark/PageRankBenchmark.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/benchmark/PageRankBenchmark.java b/giraph-core/src/main/java/org/apache/giraph/benchmark/PageRankBenchmark.java index fbb2516..77fa83c 100644 --- a/giraph-core/src/main/java/org/apache/giraph/benchmark/PageRankBenchmark.java +++ b/giraph-core/src/main/java/org/apache/giraph/benchmark/PageRankBenchmark.java @@ -15,149 +15,50 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.giraph.benchmark; import org.apache.commons.cli.CommandLine; +import org.apache.giraph.combiner.FloatSumCombiner; import org.apache.giraph.conf.GiraphConfiguration; -import org.apache.giraph.conf.GiraphConstants; -import org.apache.giraph.combiner.DoubleSumCombiner; -import org.apache.giraph.edge.ArrayListEdges; -import org.apache.giraph.edge.ByteArrayEdges; -import org.apache.giraph.edge.HashMapEdges; -import org.apache.giraph.edge.LongDoubleArrayEdges; +import org.apache.giraph.edge.IntNullArrayEdges; import org.apache.giraph.io.formats.PseudoRandomInputFormatConstants; -import org.apache.giraph.io.formats.JsonBase64VertexOutputFormat; -import org.apache.giraph.io.formats.PseudoRandomEdgeInputFormat; -import org.apache.giraph.io.formats.PseudoRandomVertexInputFormat; -import org.apache.giraph.partition.SimpleLongRangePartitionerFactory; +import org.apache.giraph.io.formats.PseudoRandomIntNullVertexInputFormat; import org.apache.hadoop.util.ToolRunner; -import org.apache.log4j.Logger; import com.google.common.collect.Sets; import java.util.Set; /** - * Default Pregel-style PageRank computation. + * Benchmark for {@link PageRankVertex} */ public class PageRankBenchmark extends GiraphBenchmark { - /** Class logger */ - private static final Logger LOG = Logger.getLogger(PageRankBenchmark.class); - - /** Option for VertexEdges class */ - private static final BenchmarkOption EDGES_CLASS = new BenchmarkOption( - "c", "edgesClass", true, - "Vertex edges class (0 for LongDoubleArrayEdges," + - "1 for ByteArrayEdges, " + - "2 for ByteArrayEdges with unsafe serialization, " + - "3 for ArrayListEdges, " + - "4 for HashMapVertex"); - /** Option for using edge input */ - private static final BenchmarkOption EDGE_INPUT = new BenchmarkOption( - "ei", "edgeInput", false, - "Use edge-based input instead of vertex-based input."); - /** Option for minimum ratio of partition-local edges */ - private static final BenchmarkOption LOCAL_EDGES_MIN_RATIO = - new BenchmarkOption( - "l", "localEdgesMinRatio", true, - "Minimum ratio of partition-local edges (default is 0)"); - /** Option for partitioning algorithm */ - private static final BenchmarkOption PARTITIONER = new BenchmarkOption( - "p", "partitioner", true, - "Partitioning algorithm (0 for hash partitioning (default), " + - "1 for range partitioning)"); - /** Option for type of combiner */ - private static final BenchmarkOption COMBINER_TYPE = new BenchmarkOption( - "t", "combinerType", true, - "Combiner type (0 for no combiner, 1 for DoubleSumCombiner (default)"); - /** Option for output format */ - private static final BenchmarkOption OUTPUT_FORMAT = new BenchmarkOption( - "o", "vertexOutputFormat", true, - "0 for JsonBase64VertexOutputFormat"); - @Override public Set<BenchmarkOption> getBenchmarkOptions() { - return Sets.newHashSet( - BenchmarkOption.SUPERSTEPS, BenchmarkOption.VERTICES, - BenchmarkOption.EDGES_PER_VERTEX, EDGES_CLASS, EDGE_INPUT, - LOCAL_EDGES_MIN_RATIO, PARTITIONER, COMBINER_TYPE, OUTPUT_FORMAT); + return Sets.newHashSet(BenchmarkOption.VERTICES, + BenchmarkOption.EDGES_PER_VERTEX, BenchmarkOption.SUPERSTEPS, + BenchmarkOption.LOCAL_EDGES_MIN_RATIO); } - /** - * Set vertex edges, input format, partitioner classes and related parameters - * based on command-line arguments. - * - * @param cmd Command line arguments - * @param configuration Giraph job configuration - */ - protected void prepareConfiguration(GiraphConfiguration configuration, + @Override + protected void prepareConfiguration(GiraphConfiguration conf, CommandLine cmd) { - configuration.setVertexClass(PageRankVertex.class); - int edgesClassOption = EDGES_CLASS.getOptionIntValue(cmd, 1); - switch (edgesClassOption) { - case 0: - configuration.setVertexEdgesClass(LongDoubleArrayEdges.class); - break; - case 1: - configuration.setVertexEdgesClass(ByteArrayEdges.class); - break; - case 2: - configuration.setVertexEdgesClass(ByteArrayEdges.class); - configuration.useUnsafeSerialization(true); - break; - case 3: - configuration.setVertexEdgesClass(ArrayListEdges.class); - break; - case 4: - configuration.setVertexEdgesClass(HashMapEdges.class); - break; - default: - LOG.info("Unknown VertexEdges class, " + - "defaulting to LongDoubleArrayEdges"); - configuration.setVertexEdgesClass(LongDoubleArrayEdges.class); - } - - LOG.info("Using edges class " + - GiraphConstants.VERTEX_EDGES_CLASS.get(configuration)); - if (COMBINER_TYPE.getOptionIntValue(cmd, 1) == 1) { - configuration.setVertexCombinerClass(DoubleSumCombiner.class); - } - - if (EDGE_INPUT.optionTurnedOn(cmd)) { - configuration.setEdgeInputFormatClass(PseudoRandomEdgeInputFormat.class); - } else { - configuration.setVertexInputFormatClass( - PseudoRandomVertexInputFormat.class); - } - - configuration.setLong( - PseudoRandomInputFormatConstants.AGGREGATE_VERTICES, - BenchmarkOption.VERTICES.getOptionLongValue(cmd)); - configuration.setLong( - PseudoRandomInputFormatConstants.EDGES_PER_VERTEX, - BenchmarkOption.EDGES_PER_VERTEX.getOptionLongValue(cmd)); - if (LOCAL_EDGES_MIN_RATIO.optionTurnedOn(cmd)) { - float localEdgesMinRatio = - Float.parseFloat(LOCAL_EDGES_MIN_RATIO.getOptionValue(cmd)); - configuration.setFloat( - PseudoRandomInputFormatConstants.LOCAL_EDGES_MIN_RATIO, - localEdgesMinRatio); - } - - if (OUTPUT_FORMAT.getOptionIntValue(cmd, -1) == 0) { - LOG.info("Using vertex output format class " + - JsonBase64VertexOutputFormat.class.getName()); - configuration.setVertexOutputFormatClass( - JsonBase64VertexOutputFormat.class); - } - - if (PARTITIONER.getOptionIntValue(cmd, 0) == 1) { - configuration.setGraphPartitionerFactoryClass( - SimpleLongRangePartitionerFactory.class); - } + conf.setVertexClass(PageRankVertex.class); + conf.setVertexEdgesClass(IntNullArrayEdges.class); + conf.setVertexCombinerClass(FloatSumCombiner.class); + conf.setVertexInputFormatClass( + PseudoRandomIntNullVertexInputFormat.class); - configuration.setInt(PageRankVertex.SUPERSTEP_COUNT, + conf.setInt(PseudoRandomInputFormatConstants.AGGREGATE_VERTICES, + BenchmarkOption.VERTICES.getOptionIntValue(cmd)); + conf.setInt(PseudoRandomInputFormatConstants.EDGES_PER_VERTEX, + BenchmarkOption.EDGES_PER_VERTEX.getOptionIntValue(cmd)); + conf.setInt(PageRankVertex.SUPERSTEP_COUNT, BenchmarkOption.SUPERSTEPS.getOptionIntValue(cmd)); + conf.setFloat(PseudoRandomInputFormatConstants.LOCAL_EDGES_MIN_RATIO, + BenchmarkOption.LOCAL_EDGES_MIN_RATIO.getOptionFloatValue(cmd, + PseudoRandomInputFormatConstants.LOCAL_EDGES_MIN_RATIO_DEFAULT)); } /** http://git-wip-us.apache.org/repos/asf/giraph/blob/96fd0538/giraph-core/src/main/java/org/apache/giraph/benchmark/PageRankVertex.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/benchmark/PageRankVertex.java b/giraph-core/src/main/java/org/apache/giraph/benchmark/PageRankVertex.java index 3f394b4..9900a44 100644 --- a/giraph-core/src/main/java/org/apache/giraph/benchmark/PageRankVertex.java +++ b/giraph-core/src/main/java/org/apache/giraph/benchmark/PageRankVertex.java @@ -19,33 +19,35 @@ package org.apache.giraph.benchmark; import org.apache.giraph.graph.Vertex; -import org.apache.hadoop.io.DoubleWritable; -import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.FloatWritable; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.NullWritable; import java.io.IOException; /** - * PageRank algorithm. + * Implementation of PageRank in which vertex ids are ints, page rank values + * are floats, and graph is unweighted. */ -public class PageRankVertex extends Vertex<LongWritable, DoubleWritable, - DoubleWritable, DoubleWritable> { +public class PageRankVertex extends Vertex<IntWritable, FloatWritable, + NullWritable, FloatWritable> { /** Number of supersteps */ public static final String SUPERSTEP_COUNT = - "giraph.pageRankBenchmark.superstepCount"; + "giraph.pageRank.superstepCount"; @Override - public void compute(Iterable<DoubleWritable> messages) throws IOException { + public void compute(Iterable<FloatWritable> messages) throws IOException { if (getSuperstep() >= 1) { - double sum = 0; - for (DoubleWritable message : messages) { + float sum = 0; + for (FloatWritable message : messages) { sum += message.get(); } getValue().set((0.15f / getTotalNumVertices()) + 0.85f * sum); } if (getSuperstep() < getConf().getInt(SUPERSTEP_COUNT, 0)) { - long edges = getNumEdges(); - sendMessageToAllEdges(new DoubleWritable(getValue().get() / edges)); + sendMessageToAllEdges( + new FloatWritable(getValue().get() / getNumEdges())); } else { voteToHalt(); } http://git-wip-us.apache.org/repos/asf/giraph/blob/96fd0538/giraph-core/src/main/java/org/apache/giraph/benchmark/WeightedPageRankBenchmark.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/benchmark/WeightedPageRankBenchmark.java b/giraph-core/src/main/java/org/apache/giraph/benchmark/WeightedPageRankBenchmark.java new file mode 100644 index 0000000..4c76996 --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/benchmark/WeightedPageRankBenchmark.java @@ -0,0 +1,165 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.benchmark; + +import org.apache.commons.cli.CommandLine; +import org.apache.giraph.conf.GiraphConfiguration; +import org.apache.giraph.conf.GiraphConstants; +import org.apache.giraph.combiner.DoubleSumCombiner; +import org.apache.giraph.edge.ArrayListEdges; +import org.apache.giraph.edge.ByteArrayEdges; +import org.apache.giraph.edge.HashMapEdges; +import org.apache.giraph.edge.LongDoubleArrayEdges; +import org.apache.giraph.io.formats.PseudoRandomInputFormatConstants; +import org.apache.giraph.io.formats.JsonBase64VertexOutputFormat; +import org.apache.giraph.io.formats.PseudoRandomEdgeInputFormat; +import org.apache.giraph.io.formats.PseudoRandomVertexInputFormat; +import org.apache.giraph.partition.SimpleLongRangePartitionerFactory; +import org.apache.hadoop.util.ToolRunner; +import org.apache.log4j.Logger; + +import com.google.common.collect.Sets; + +import java.util.Set; + +/** + * Benchmark for {@link WeightedPageRankVertex} + */ +public class WeightedPageRankBenchmark extends GiraphBenchmark { + /** Class logger */ + private static final Logger LOG = + Logger.getLogger(WeightedPageRankBenchmark.class); + + /** Option for VertexEdges class */ + private static final BenchmarkOption EDGES_CLASS = new BenchmarkOption( + "c", "edgesClass", true, + "Vertex edges class (0 for LongDoubleArrayEdges," + + "1 for ByteArrayEdges, " + + "2 for ByteArrayEdges with unsafe serialization, " + + "3 for ArrayListEdges, " + + "4 for HashMapVertex"); + /** Option for using edge input */ + private static final BenchmarkOption EDGE_INPUT = new BenchmarkOption( + "ei", "edgeInput", false, + "Use edge-based input instead of vertex-based input."); + /** Option for partitioning algorithm */ + private static final BenchmarkOption PARTITIONER = new BenchmarkOption( + "p", "partitioner", true, + "Partitioning algorithm (0 for hash partitioning (default), " + + "1 for range partitioning)"); + /** Option for type of combiner */ + private static final BenchmarkOption COMBINER_TYPE = new BenchmarkOption( + "t", "combinerType", true, + "Combiner type (0 for no combiner, 1 for DoubleSumCombiner (default)"); + /** Option for output format */ + private static final BenchmarkOption OUTPUT_FORMAT = new BenchmarkOption( + "o", "vertexOutputFormat", true, + "0 for JsonBase64VertexOutputFormat"); + + @Override + public Set<BenchmarkOption> getBenchmarkOptions() { + return Sets.newHashSet( + BenchmarkOption.SUPERSTEPS, BenchmarkOption.VERTICES, + BenchmarkOption.EDGES_PER_VERTEX, BenchmarkOption.LOCAL_EDGES_MIN_RATIO, + EDGES_CLASS, EDGE_INPUT, PARTITIONER, COMBINER_TYPE, OUTPUT_FORMAT); + } + + /** + * Set vertex edges, input format, partitioner classes and related parameters + * based on command-line arguments. + * + * @param cmd Command line arguments + * @param configuration Giraph job configuration + */ + protected void prepareConfiguration(GiraphConfiguration configuration, + CommandLine cmd) { + configuration.setVertexClass(WeightedPageRankVertex.class); + int edgesClassOption = EDGES_CLASS.getOptionIntValue(cmd, 1); + switch (edgesClassOption) { + case 0: + configuration.setVertexEdgesClass(LongDoubleArrayEdges.class); + break; + case 1: + configuration.setVertexEdgesClass(ByteArrayEdges.class); + break; + case 2: + configuration.setVertexEdgesClass(ByteArrayEdges.class); + configuration.useUnsafeSerialization(true); + break; + case 3: + configuration.setVertexEdgesClass(ArrayListEdges.class); + break; + case 4: + configuration.setVertexEdgesClass(HashMapEdges.class); + break; + default: + LOG.info("Unknown VertexEdges class, " + + "defaulting to LongDoubleArrayEdges"); + configuration.setVertexEdgesClass(LongDoubleArrayEdges.class); + } + + LOG.info("Using edges class " + + GiraphConstants.VERTEX_EDGES_CLASS.get(configuration)); + if (COMBINER_TYPE.getOptionIntValue(cmd, 1) == 1) { + configuration.setVertexCombinerClass(DoubleSumCombiner.class); + } + + if (EDGE_INPUT.optionTurnedOn(cmd)) { + configuration.setEdgeInputFormatClass(PseudoRandomEdgeInputFormat.class); + } else { + configuration.setVertexInputFormatClass( + PseudoRandomVertexInputFormat.class); + } + + configuration.setLong( + PseudoRandomInputFormatConstants.AGGREGATE_VERTICES, + BenchmarkOption.VERTICES.getOptionLongValue(cmd)); + configuration.setLong( + PseudoRandomInputFormatConstants.EDGES_PER_VERTEX, + BenchmarkOption.EDGES_PER_VERTEX.getOptionLongValue(cmd)); + configuration.setFloat( + PseudoRandomInputFormatConstants.LOCAL_EDGES_MIN_RATIO, + BenchmarkOption.LOCAL_EDGES_MIN_RATIO.getOptionFloatValue(cmd, + PseudoRandomInputFormatConstants.LOCAL_EDGES_MIN_RATIO_DEFAULT)); + + if (OUTPUT_FORMAT.getOptionIntValue(cmd, -1) == 0) { + LOG.info("Using vertex output format class " + + JsonBase64VertexOutputFormat.class.getName()); + configuration.setVertexOutputFormatClass( + JsonBase64VertexOutputFormat.class); + } + + if (PARTITIONER.getOptionIntValue(cmd, 0) == 1) { + configuration.setGraphPartitionerFactoryClass( + SimpleLongRangePartitionerFactory.class); + } + + configuration.setInt(WeightedPageRankVertex.SUPERSTEP_COUNT, + BenchmarkOption.SUPERSTEPS.getOptionIntValue(cmd)); + } + + /** + * Execute the benchmark. + * + * @param args Typically the command line arguments. + * @throws Exception Any exception from the computation. + */ + public static void main(final String[] args) throws Exception { + System.exit(ToolRunner.run(new WeightedPageRankBenchmark(), args)); + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/96fd0538/giraph-core/src/main/java/org/apache/giraph/benchmark/WeightedPageRankVertex.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/benchmark/WeightedPageRankVertex.java b/giraph-core/src/main/java/org/apache/giraph/benchmark/WeightedPageRankVertex.java new file mode 100644 index 0000000..70f0f61 --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/benchmark/WeightedPageRankVertex.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.benchmark; + +import org.apache.giraph.edge.Edge; +import org.apache.giraph.edge.MutableEdge; +import org.apache.giraph.graph.Vertex; +import org.apache.hadoop.io.DoubleWritable; +import org.apache.hadoop.io.LongWritable; + +import java.io.IOException; + +/** + * Implementation of Page Rank algorithm on a weighted graph. + */ +public class WeightedPageRankVertex extends Vertex<LongWritable, DoubleWritable, + DoubleWritable, DoubleWritable> { + /** Number of supersteps */ + public static final String SUPERSTEP_COUNT = + "giraph.weightedPageRank.superstepCount"; + + @Override + public void compute(Iterable<DoubleWritable> messages) throws IOException { + if (getSuperstep() == 0) { + // Normalize out edge weights + double outEdgeSum = 0; + for (Edge<LongWritable, DoubleWritable> edge : getEdges()) { + outEdgeSum += edge.getValue().get(); + } + for (MutableEdge<LongWritable, DoubleWritable> edge : getMutableEdges()) { + edge.setValue(new DoubleWritable(edge.getValue().get() / outEdgeSum)); + } + } else { + double messageSum = 0; + for (DoubleWritable message : messages) { + messageSum += message.get(); + } + getValue().set((0.15f / getTotalNumVertices()) + 0.85f * messageSum); + } + + if (getSuperstep() < getConf().getInt(SUPERSTEP_COUNT, 0)) { + for (Edge<LongWritable, DoubleWritable> edge : getEdges()) { + sendMessage(edge.getTargetVertexId(), + new DoubleWritable(getValue().get() * edge.getValue().get())); + } + } else { + voteToHalt(); + } + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/96fd0538/giraph-core/src/main/java/org/apache/giraph/combiner/FloatSumCombiner.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/combiner/FloatSumCombiner.java b/giraph-core/src/main/java/org/apache/giraph/combiner/FloatSumCombiner.java new file mode 100644 index 0000000..d898791 --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/combiner/FloatSumCombiner.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.combiner; + +import org.apache.hadoop.io.FloatWritable; +import org.apache.hadoop.io.IntWritable; + +/** + * A combiner that sums float-valued messages + */ +public class FloatSumCombiner extends + Combiner<IntWritable, FloatWritable> { + @Override + public void combine(IntWritable vertexIndex, FloatWritable originalMessage, + FloatWritable messageToCombine) { + originalMessage.set(originalMessage.get() + messageToCombine.get()); + } + + @Override + public FloatWritable createInitialMessage() { + return new FloatWritable(0); + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/96fd0538/giraph-core/src/main/java/org/apache/giraph/edge/IntNullArrayEdges.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/edge/IntNullArrayEdges.java b/giraph-core/src/main/java/org/apache/giraph/edge/IntNullArrayEdges.java new file mode 100644 index 0000000..2363caf --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/edge/IntNullArrayEdges.java @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.edge; + +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.NullWritable; + +import com.google.common.collect.UnmodifiableIterator; + +import it.unimi.dsi.fastutil.ints.IntArrayList; +import it.unimi.dsi.fastutil.ints.IntIterator; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.Collection; +import java.util.Iterator; + +/** + * Implementation of {@link VertexEdges} with int ids and null edge + * values, backed by dynamic primitive array. + * Parallel edges are allowed. + * Note: this implementation is optimized for space usage, + * but edge removals are expensive. + */ +public class IntNullArrayEdges + implements ReuseObjectsVertexEdges<IntWritable, NullWritable> { + /** Array of target vertex ids */ + private IntArrayList neighbors; + + @Override + public void initialize(Iterable<Edge<IntWritable, NullWritable>> edges) { + // If the iterable is actually a collection, we can cheaply get the + // size and initialize the array with the expected capacity. + if (edges instanceof Collection) { + int numEdges = + ((Collection<Edge<IntWritable, NullWritable>>) edges).size(); + initialize(numEdges); + } else { + initialize(); + } + for (Edge<IntWritable, NullWritable> edge : edges) { + add(edge); + } + } + + @Override + public void initialize(int capacity) { + neighbors = new IntArrayList(capacity); + } + + @Override + public void initialize() { + neighbors = new IntArrayList(); + } + + @Override + public int size() { + return neighbors.size(); + } + + @Override + public void add(Edge<IntWritable, NullWritable> edge) { + neighbors.add(edge.getTargetVertexId().get()); + } + + /** + * Remove edge at position i. + * + * @param i Position of edge to be removed + */ + private void removeAt(int i) { + // The order of the edges is irrelevant, so we can simply replace + // the deleted edge with the rightmost element, thus achieving constant + // time. + if (i == neighbors.size() - 1) { + neighbors.popInt(); + } else { + neighbors.set(i, neighbors.popInt()); + } + } + + @Override + public void remove(IntWritable targetVertexId) { + // Thanks to the constant-time implementation of removeAt(int), + // we can remove all matching edges in linear time. + for (int i = neighbors.size() - 1; i >= 0; --i) { + if (neighbors.get(i) == targetVertexId.get()) { + removeAt(i); + } + } + } + + @Override + public Iterator<Edge<IntWritable, NullWritable>> iterator() { + // Returns an iterator that reuses objects. + return new UnmodifiableIterator<Edge<IntWritable, NullWritable>>() { + /** Wrapped neighbors iterator. */ + private IntIterator neighborsIt = neighbors.iterator(); + /** Representative edge object. */ + private Edge<IntWritable, NullWritable> representativeEdge = + EdgeFactory.create(new IntWritable(), NullWritable.get()); + + @Override + public boolean hasNext() { + return neighborsIt.hasNext(); + } + + @Override + public Edge<IntWritable, NullWritable> next() { + representativeEdge.getTargetVertexId().set(neighborsIt.nextInt()); + return representativeEdge; + } + }; + } + + @Override + public void write(DataOutput out) throws IOException { + out.writeInt(neighbors.size()); + IntIterator iterator = neighbors.iterator(); + while (iterator.hasNext()) { + out.writeInt(iterator.nextInt()); + } + } + + @Override + public void readFields(DataInput in) throws IOException { + int numEdges = in.readInt(); + initialize(numEdges); + for (int i = 0; i < numEdges; ++i) { + neighbors.add(in.readInt()); + } + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/96fd0538/giraph-core/src/main/java/org/apache/giraph/io/formats/PseudoRandomEdgeInputFormat.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/io/formats/PseudoRandomEdgeInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/formats/PseudoRandomEdgeInputFormat.java index 87cadb4..2cc4dba 100644 --- a/giraph-core/src/main/java/org/apache/giraph/io/formats/PseudoRandomEdgeInputFormat.java +++ b/giraph-core/src/main/java/org/apache/giraph/io/formats/PseudoRandomEdgeInputFormat.java @@ -20,7 +20,6 @@ package org.apache.giraph.io.formats; import com.google.common.collect.Sets; import java.io.IOException; -import java.util.ArrayList; import java.util.List; import java.util.Random; import java.util.Set; @@ -47,13 +46,7 @@ public class PseudoRandomEdgeInputFormat public final List<InputSplit> getSplits(final JobContext context, final int minSplitCountHint) throws IOException, InterruptedException { - // This is meaningless, the PseudoRandomEdgeReader will generate - // all the test data - List<InputSplit> inputSplitList = new ArrayList<InputSplit>(); - for (int i = 0; i < minSplitCountHint; ++i) { - inputSplitList.add(new BspInputSplit(i, minSplitCountHint)); - } - return inputSplitList; + return PseudoRandomUtils.getSplits(minSplitCountHint); } @Override http://git-wip-us.apache.org/repos/asf/giraph/blob/96fd0538/giraph-core/src/main/java/org/apache/giraph/io/formats/PseudoRandomIntNullLocalEdgesHelper.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/io/formats/PseudoRandomIntNullLocalEdgesHelper.java b/giraph-core/src/main/java/org/apache/giraph/io/formats/PseudoRandomIntNullLocalEdgesHelper.java new file mode 100644 index 0000000..46997a8 --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/io/formats/PseudoRandomIntNullLocalEdgesHelper.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.io.formats; + +import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; +import org.apache.giraph.partition.PartitionUtils; +import org.apache.giraph.worker.WorkerInfo; + +import java.util.Collections; +import java.util.List; +import java.util.Random; + +/** + * Helper class to generate pseudo-random local edges. + * Like {@link PseudoRandomLocalEdgesHelper}, but for graphs where vertex ids + * are integers. + */ +public class PseudoRandomIntNullLocalEdgesHelper { + /** Minimum ratio of partition-local edges. */ + private float minLocalEdgesRatio; + /** Total number of vertices. */ + private int numVertices; + /** Total number of partitions. */ + private int numPartitions; + /** Average partition size. */ + private int partitionSize; + + /** + * Constructor. + * + * @param numVertices Total number of vertices. + * @param conf Configuration. + */ + public PseudoRandomIntNullLocalEdgesHelper(int numVertices, + ImmutableClassesGiraphConfiguration conf) { + this.minLocalEdgesRatio = conf.getFloat( + PseudoRandomInputFormatConstants.LOCAL_EDGES_MIN_RATIO, + PseudoRandomInputFormatConstants.LOCAL_EDGES_MIN_RATIO_DEFAULT); + this.numVertices = numVertices; + int numWorkers = conf.getMaxWorkers(); + List<WorkerInfo> workerInfos = Collections.nCopies(numWorkers, + new WorkerInfo()); + numPartitions = PartitionUtils.computePartitionCount(workerInfos, + numWorkers, conf); + partitionSize = numVertices / numPartitions; + } + + /** + * Generate a destination vertex id for the given source vertex, + * using the desired configuration for edge locality and the provided + * pseudo-random generator. + * + * @param sourceVertexId Source vertex id. + * @param rand Pseudo-random generator. + * @return Destination vertex id. + */ + public int generateDestVertex(int sourceVertexId, Random rand) { + if (rand.nextFloat() < minLocalEdgesRatio) { + int partitionId = sourceVertexId % numPartitions; + return partitionId + numPartitions * rand.nextInt(partitionSize); + } else { + return rand.nextInt(numVertices); + } + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/96fd0538/giraph-core/src/main/java/org/apache/giraph/io/formats/PseudoRandomIntNullVertexInputFormat.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/io/formats/PseudoRandomIntNullVertexInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/formats/PseudoRandomIntNullVertexInputFormat.java new file mode 100644 index 0000000..b27fcc8 --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/io/formats/PseudoRandomIntNullVertexInputFormat.java @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.io.formats; + +import org.apache.giraph.bsp.BspInputSplit; +import org.apache.giraph.edge.Edge; +import org.apache.giraph.edge.ReusableEdge; +import org.apache.giraph.edge.VertexEdges; +import org.apache.giraph.graph.Vertex; +import org.apache.giraph.io.VertexInputFormat; +import org.apache.giraph.io.VertexReader; +import org.apache.hadoop.io.FloatWritable; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.TaskAttemptContext; + +import it.unimi.dsi.fastutil.ints.IntOpenHashSet; +import it.unimi.dsi.fastutil.ints.IntSet; + +import java.io.IOException; +import java.util.List; +import java.util.Random; + +/** + * VertexInputFormat for large scale testing, + * like {@link PseudoRandomVertexInputFormat}, but for the unweighted graphs + * where vertex ids are integers. + */ +public class PseudoRandomIntNullVertexInputFormat extends + VertexInputFormat<IntWritable, FloatWritable, NullWritable> { + @Override + public final List<InputSplit> getSplits(final JobContext context, + final int minSplitCountHint) throws IOException, InterruptedException { + return PseudoRandomUtils.getSplits(minSplitCountHint); + } + + @Override + public VertexReader<IntWritable, FloatWritable, NullWritable> + createVertexReader(InputSplit split, + TaskAttemptContext context) throws IOException { + return new PseudoRandomVertexReader(); + } + + /** + * Used by {@link PseudoRandomIntNullVertexInputFormat} to read + * pseudo-randomly generated data. + */ + private static class PseudoRandomVertexReader extends + VertexReader<IntWritable, FloatWritable, NullWritable> { + /** Starting vertex id. */ + private int startingVertexId = -1; + /** Vertices read so far. */ + private int verticesRead = 0; + /** Total vertices to read (on this split alone). */ + private int totalSplitVertices = -1; + /** Edges per vertex. */ + private int edgesPerVertex = -1; + /** Reusable int set */ + private final IntSet destVertices = new IntOpenHashSet(); + /** Resuable edge object */ + private ReusableEdge<IntWritable, NullWritable> reusableEdge = null; + /** Helper for generating pseudo-random local edges. */ + private PseudoRandomIntNullLocalEdgesHelper localEdgesHelper; + /** Random */ + private Random rand; + + /** Default constructor for reflection. */ + public PseudoRandomVertexReader() { + } + + @Override + public void initialize(InputSplit inputSplit, + TaskAttemptContext context) throws IOException { + int aggregateVertices = getConf().getInt( + PseudoRandomInputFormatConstants.AGGREGATE_VERTICES, 0); + BspInputSplit bspInputSplit = (BspInputSplit) inputSplit; + int extraVertices = aggregateVertices % bspInputSplit.getNumSplits(); + totalSplitVertices = aggregateVertices / bspInputSplit.getNumSplits(); + if (bspInputSplit.getSplitIndex() < extraVertices) { + ++totalSplitVertices; + } + startingVertexId = bspInputSplit.getSplitIndex() * + (aggregateVertices / bspInputSplit.getNumSplits()) + + Math.min(bspInputSplit.getSplitIndex(), extraVertices); + edgesPerVertex = getConf().getInt( + PseudoRandomInputFormatConstants.EDGES_PER_VERTEX, 0); + rand = new Random(bspInputSplit.getSplitIndex()); + if (getConf().reuseEdgeObjects()) { + reusableEdge = getConf().createReusableEdge(); + } + localEdgesHelper = new PseudoRandomIntNullLocalEdgesHelper( + aggregateVertices, getConf()); + } + + @Override + public boolean nextVertex() throws IOException, InterruptedException { + return totalSplitVertices > verticesRead; + } + + @Override + public Vertex<IntWritable, FloatWritable, NullWritable, ?> + getCurrentVertex() throws IOException, InterruptedException { + Vertex<IntWritable, FloatWritable, NullWritable, ?> vertex = + getConf().createVertex(); + int vertexId = startingVertexId + verticesRead; + VertexEdges<IntWritable, NullWritable> edges = + getConf().createVertexEdges(); + edges.initialize(edgesPerVertex); + destVertices.clear(); + for (int i = 0; i < edgesPerVertex; ++i) { + int destVertexId; + do { + destVertexId = localEdgesHelper.generateDestVertex(vertexId, rand); + } while (!destVertices.add(destVertexId)); + Edge<IntWritable, NullWritable> edge = + (reusableEdge == null) ? getConf().createEdge() : reusableEdge; + edge.getTargetVertexId().set(destVertexId); + edges.add(edge); + } + vertex.initialize( + new IntWritable(vertexId), new FloatWritable(1.0f), edges); + ++verticesRead; + return vertex; + } + + @Override + public void close() throws IOException { + } + + @Override + public float getProgress() throws IOException { + return verticesRead * 100.0f / totalSplitVertices; + } + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/96fd0538/giraph-core/src/main/java/org/apache/giraph/io/formats/PseudoRandomUtils.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/io/formats/PseudoRandomUtils.java b/giraph-core/src/main/java/org/apache/giraph/io/formats/PseudoRandomUtils.java new file mode 100644 index 0000000..6def976 --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/io/formats/PseudoRandomUtils.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.io.formats; + +import org.apache.giraph.bsp.BspInputSplit; +import org.apache.hadoop.mapreduce.InputSplit; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +/** + * Utility methods for PseudoRandom input formats + */ +public class PseudoRandomUtils { + /** Do not instantiate */ + private PseudoRandomUtils() { } + + /** + * Create desired number of {@link BspInputSplit}s + * + * @param numSplits How many splits to create + * @return List of {@link BspInputSplit}s + */ + public static List<InputSplit> getSplits(int numSplits) throws IOException, + InterruptedException { + List<InputSplit> inputSplitList = new ArrayList<InputSplit>(); + for (int i = 0; i < numSplits; ++i) { + inputSplitList.add(new BspInputSplit(i, numSplits)); + } + return inputSplitList; + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/96fd0538/giraph-core/src/main/java/org/apache/giraph/io/formats/PseudoRandomVertexInputFormat.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/io/formats/PseudoRandomVertexInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/formats/PseudoRandomVertexInputFormat.java index dca0271..6703b22 100644 --- a/giraph-core/src/main/java/org/apache/giraph/io/formats/PseudoRandomVertexInputFormat.java +++ b/giraph-core/src/main/java/org/apache/giraph/io/formats/PseudoRandomVertexInputFormat.java @@ -20,7 +20,6 @@ package org.apache.giraph.io.formats; import com.google.common.collect.Sets; import java.io.IOException; -import java.util.ArrayList; import java.util.List; import java.util.Random; import java.util.Set; @@ -48,13 +47,7 @@ public class PseudoRandomVertexInputFormat extends @Override public final List<InputSplit> getSplits(final JobContext context, final int minSplitCountHint) throws IOException, InterruptedException { - // This is meaningless, the PseudoRandomVertexReader will generate - // all the test data - List<InputSplit> inputSplitList = new ArrayList<InputSplit>(); - for (int i = 0; i < minSplitCountHint; ++i) { - inputSplitList.add(new BspInputSplit(i, minSplitCountHint)); - } - return inputSplitList; + return PseudoRandomUtils.getSplits(minSplitCountHint); } @Override http://git-wip-us.apache.org/repos/asf/giraph/blob/96fd0538/giraph-core/src/test/java/org/apache/giraph/io/TestJsonBase64Format.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/test/java/org/apache/giraph/io/TestJsonBase64Format.java b/giraph-core/src/test/java/org/apache/giraph/io/TestJsonBase64Format.java index 0117ce9..ae9441e 100644 --- a/giraph-core/src/test/java/org/apache/giraph/io/TestJsonBase64Format.java +++ b/giraph-core/src/test/java/org/apache/giraph/io/TestJsonBase64Format.java @@ -19,7 +19,7 @@ package org.apache.giraph.io; import org.apache.giraph.BspCase; -import org.apache.giraph.benchmark.PageRankVertex; +import org.apache.giraph.benchmark.WeightedPageRankVertex; import org.apache.giraph.conf.GiraphClasses; import org.apache.giraph.io.formats.GiraphFileInputFormat; import org.apache.giraph.io.formats.JsonBase64VertexInputFormat; @@ -62,7 +62,7 @@ public class TestJsonBase64Format extends BspCase { Path outputPath = getTempPath(getCallingMethodName()); GiraphClasses classes = new GiraphClasses(); - classes.setVertexClass(PageRankVertex.class); + classes.setVertexClass(WeightedPageRankVertex.class); classes.setVertexInputFormatClass(PseudoRandomVertexInputFormat.class); classes.setVertexOutputFormatClass(JsonBase64VertexOutputFormat.class); GiraphJob job = prepareJob(getCallingMethodName(), classes, outputPath); @@ -70,24 +70,24 @@ public class TestJsonBase64Format extends BspCase { PseudoRandomInputFormatConstants.AGGREGATE_VERTICES, 101); job.getConfiguration().setLong( PseudoRandomInputFormatConstants.EDGES_PER_VERTEX, 2); - job.getConfiguration().setInt(PageRankVertex.SUPERSTEP_COUNT, 2); + job.getConfiguration().setInt(WeightedPageRankVertex.SUPERSTEP_COUNT, 2); assertTrue(job.run(true)); Path outputPath2 = getTempPath(getCallingMethodName() + "2"); classes = new GiraphClasses(); - classes.setVertexClass(PageRankVertex.class); + classes.setVertexClass(WeightedPageRankVertex.class); classes.setVertexInputFormatClass(JsonBase64VertexInputFormat.class); classes.setVertexOutputFormatClass(JsonBase64VertexOutputFormat.class); job = prepareJob(getCallingMethodName(), classes, outputPath2); - job.getConfiguration().setInt(PageRankVertex.SUPERSTEP_COUNT, 3); + job.getConfiguration().setInt(WeightedPageRankVertex.SUPERSTEP_COUNT, 3); GiraphFileInputFormat.addVertexInputPath( job.getInternalJob().getConfiguration(), outputPath); assertTrue(job.run(true)); Path outputPath3 = getTempPath(getCallingMethodName() + "3"); classes = new GiraphClasses(); - classes.setVertexClass(PageRankVertex.class); + classes.setVertexClass(WeightedPageRankVertex.class); classes.setVertexInputFormatClass(PseudoRandomVertexInputFormat.class); classes.setVertexOutputFormatClass(JsonBase64VertexOutputFormat.class); job = prepareJob(getCallingMethodName(), classes, outputPath3); @@ -95,7 +95,7 @@ public class TestJsonBase64Format extends BspCase { PseudoRandomInputFormatConstants.AGGREGATE_VERTICES, 101); job.getConfiguration().setLong( PseudoRandomInputFormatConstants.EDGES_PER_VERTEX, 2); - job.getConfiguration().setInt(PageRankVertex.SUPERSTEP_COUNT, 5); + job.getConfiguration().setInt(WeightedPageRankVertex.SUPERSTEP_COUNT, 5); assertTrue(job.run(true)); Configuration conf = job.getConfiguration();
