Updated Branches: refs/heads/trunk 7e48523b5 -> b77acf2ef
GIRAPH-630: Convergence detection broken in o.a.g.examples.PageRankVertex. (ssc via aching) Project: http://git-wip-us.apache.org/repos/asf/giraph/repo Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/b77acf2e Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/b77acf2e Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/b77acf2e Branch: refs/heads/trunk Commit: b77acf2ef54d14b1566e2e471178f1f235130911 Parents: 7e48523 Author: Avery Ching <[email protected]> Authored: Sun Apr 14 14:11:46 2013 -0700 Committer: Avery Ching <[email protected]> Committed: Sun Apr 14 14:21:54 2013 -0700 ---------------------------------------------------------------------- CHANGELOG | 7 +- .../org/apache/giraph/examples/PageRankVertex.java | 4 +- .../apache/giraph/examples/RandomWalkVertex.java | 82 ++++++--------- .../examples/RandomWalkVertexMasterCompute.java | 81 ++++++++++++++ .../apache/giraph/examples/PageRankVertexTest.java | 5 +- .../examples/RandomWalkWithRestartVertexTest.java | 1 - 6 files changed, 119 insertions(+), 61 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/giraph/blob/b77acf2e/CHANGELOG ---------------------------------------------------------------------- diff --git a/CHANGELOG b/CHANGELOG index 38e6e83..98d5490 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -1,8 +1,11 @@ Giraph Change Log -Release 1.1 - unreleased +Release 1.0.1 - unreleased -Release 1.0 - 2013-04-12 +Release 1.0.0 - 2013-04-12 + + GIRAPH-630: Convergence detection broken in + o.a.g.examples.PageRankVertex. (ssc via aching) GIRAPH-627: YARN build profile is broken. (rvs via aching) http://git-wip-us.apache.org/repos/asf/giraph/blob/b77acf2e/giraph-examples/src/main/java/org/apache/giraph/examples/PageRankVertex.java ---------------------------------------------------------------------- diff --git a/giraph-examples/src/main/java/org/apache/giraph/examples/PageRankVertex.java b/giraph-examples/src/main/java/org/apache/giraph/examples/PageRankVertex.java index 733ee53..9678b31 100644 --- a/giraph-examples/src/main/java/org/apache/giraph/examples/PageRankVertex.java +++ b/giraph-examples/src/main/java/org/apache/giraph/examples/PageRankVertex.java @@ -47,10 +47,8 @@ public class PageRankVertex extends RandomWalkVertex<NullWritable> { getDanglingProbability() / getTotalNumVertices(); // recompute rank - double rank = (1d - teleportationProbability) * + return (1d - teleportationProbability) * (rankFromNeighbors + danglingContribution) + teleportationProbability / getTotalNumVertices(); - - return rank; } } http://git-wip-us.apache.org/repos/asf/giraph/blob/b77acf2e/giraph-examples/src/main/java/org/apache/giraph/examples/RandomWalkVertex.java ---------------------------------------------------------------------- diff --git a/giraph-examples/src/main/java/org/apache/giraph/examples/RandomWalkVertex.java b/giraph-examples/src/main/java/org/apache/giraph/examples/RandomWalkVertex.java index 85c6e27..2d2c988 100644 --- a/giraph-examples/src/main/java/org/apache/giraph/examples/RandomWalkVertex.java +++ b/giraph-examples/src/main/java/org/apache/giraph/examples/RandomWalkVertex.java @@ -18,14 +18,11 @@ package org.apache.giraph.examples; -import org.apache.giraph.aggregators.DoubleSumAggregator; -import org.apache.giraph.master.DefaultMasterCompute; import org.apache.giraph.edge.Edge; import org.apache.giraph.graph.Vertex; import org.apache.hadoop.io.DoubleWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Writable; -import org.apache.log4j.Logger; import java.io.IOException; @@ -42,17 +39,23 @@ public abstract class RandomWalkVertex<E extends Writable> /** Configuration parameter for the teleportation probability */ static final String TELEPORTATION_PROBABILITY = RandomWalkVertex.class .getName() + ".teleportationProbability"; - /** Name of aggregator for collecting the probability of dangling vertices */ + /** Name of aggregator for the probability of dangling vertices */ static final String CUMULATIVE_DANGLING_PROBABILITY = RandomWalkVertex.class .getName() + ".cumulativeDanglingProbability"; + /** Name of aggregator for the probability of all vertices */ + static final String CUMULATIVE_PROBABILITY = RandomWalkVertex.class + .getName() + ".cumulativeProbability"; + /** Name of aggregator for the probability of dangling vertices */ + static final String NUM_DANGLING_VERTICES = RandomWalkVertex.class + .getName() + ".numDanglingVertices"; /** Name of aggregator for the L1 norm of the probability difference, used * for covergence detection */ static final String L1_NORM_OF_PROBABILITY_DIFFERENCE = RandomWalkVertex.class .getName() + ".l1NormOfProbabilityDifference"; - /** Logger */ - private static final Logger LOG = Logger.getLogger(RandomWalkVertex.class); /** Reusable {@link DoubleWritable} instance to avoid object instantiation */ private final DoubleWritable doubleWritable = new DoubleWritable(); + /** Reusable {@link LongWritable} for counting dangling vertices */ + private final LongWritable one = new LongWritable(1); /** * Compute an initial probability value for the vertex. Per default, @@ -83,34 +86,50 @@ public abstract class RandomWalkVertex<E extends Writable> double teleportationProbability); /** - * Returns the cumulative probability from dangling nodes. - * @return The cumulative probability from dangling nodes. + * Returns the cumulative probability from dangling vertices. + * @return The cumulative probability from dangling vertices. */ protected double getDanglingProbability() { return this.<DoubleWritable>getAggregatedValue( RandomWalkVertex.CUMULATIVE_DANGLING_PROBABILITY).get(); } + /** + * Returns the cumulative probability from dangling vertices. + * @return The cumulative probability from dangling vertices. + */ + protected double getPreviousCumulativeProbability() { + return this.<DoubleWritable>getAggregatedValue( + RandomWalkVertex.CUMULATIVE_PROBABILITY).get(); + } + @Override public void compute(Iterable<DoubleWritable> messages) throws IOException { double stateProbability; if (getSuperstep() > 0) { + double previousStateProbability = getValue().get(); stateProbability = recompute(messages, teleportationProbability()); + // Important: rescale for numerical stability + stateProbability /= getPreviousCumulativeProbability(); + doubleWritable.set(Math.abs(stateProbability - previousStateProbability)); aggregate(L1_NORM_OF_PROBABILITY_DIFFERENCE, doubleWritable); } else { stateProbability = initialProbability(); } - doubleWritable.set(stateProbability); - setValue(doubleWritable); + + getValue().set(stateProbability); + + aggregate(CUMULATIVE_PROBABILITY, getValue()); // Compute dangling node contribution for next superstep if (getNumEdges() == 0) { - aggregate(CUMULATIVE_DANGLING_PROBABILITY, doubleWritable); + aggregate(NUM_DANGLING_VERTICES, one); + aggregate(CUMULATIVE_DANGLING_PROBABILITY, getValue()); } if (getSuperstep() < maxSupersteps()) { @@ -141,45 +160,4 @@ public abstract class RandomWalkVertex<E extends Writable> return ((RandomWalkWorkerContext) getWorkerContext()) .getTeleportationProbability(); } - - /** - * Master compute associated with {@link RandomWalkVertex}. It handles - * dangling nodes. - */ - public static class RandomWalkVertexMasterCompute extends - DefaultMasterCompute { - - /** threshold for the L1 norm of the state vector difference */ - static final double CONVERGENCE_THRESHOLD = 0.00001; - - @Override - public void compute() { - double danglingContribution = - this.<DoubleWritable>getAggregatedValue( - RandomWalkVertex.CUMULATIVE_DANGLING_PROBABILITY).get(); - double l1NormOfStateDiff = - this.<DoubleWritable>getAggregatedValue( - RandomWalkVertex.L1_NORM_OF_PROBABILITY_DIFFERENCE).get(); - - LOG.info("[Superstep " + getSuperstep() + "] Dangling contribution = " + - danglingContribution + ", L1 Norm of state vector difference = " + - l1NormOfStateDiff); - - // Convergence check: halt once the L1 norm of the difference between the - // state vectors fall under the threshold - if (getSuperstep() > 1 && l1NormOfStateDiff < CONVERGENCE_THRESHOLD) { - haltComputation(); - } - - } - - @Override - public void initialize() throws InstantiationException, - IllegalAccessException { - registerAggregator(RandomWalkVertex.CUMULATIVE_DANGLING_PROBABILITY, - DoubleSumAggregator.class); - registerAggregator(RandomWalkVertex.L1_NORM_OF_PROBABILITY_DIFFERENCE, - DoubleSumAggregator.class); - } - } } http://git-wip-us.apache.org/repos/asf/giraph/blob/b77acf2e/giraph-examples/src/main/java/org/apache/giraph/examples/RandomWalkVertexMasterCompute.java ---------------------------------------------------------------------- diff --git a/giraph-examples/src/main/java/org/apache/giraph/examples/RandomWalkVertexMasterCompute.java b/giraph-examples/src/main/java/org/apache/giraph/examples/RandomWalkVertexMasterCompute.java new file mode 100644 index 0000000..9e5dbbf --- /dev/null +++ b/giraph-examples/src/main/java/org/apache/giraph/examples/RandomWalkVertexMasterCompute.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.examples; + +import org.apache.giraph.aggregators.DoubleSumAggregator; +import org.apache.giraph.aggregators.LongSumAggregator; +import org.apache.giraph.master.DefaultMasterCompute; +import org.apache.hadoop.io.DoubleWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.log4j.Logger; + +/** + * Master compute associated with {@link RandomWalkVertex}. It handles + * dangling nodes. + */ +public class RandomWalkVertexMasterCompute extends DefaultMasterCompute { + + /** threshold for the L1 norm of the state vector difference */ + static final double CONVERGENCE_THRESHOLD = 0.00001; + + /** logger */ + private static final Logger LOG = + Logger.getLogger(RandomWalkVertexMasterCompute.class); + + @Override + public void compute() { + double danglingContribution = + this.<DoubleWritable>getAggregatedValue( + RandomWalkVertex.CUMULATIVE_DANGLING_PROBABILITY).get(); + double cumulativeProbability = + this.<DoubleWritable>getAggregatedValue( + RandomWalkVertex.CUMULATIVE_PROBABILITY).get(); + double l1NormOfStateDiff = + this.<DoubleWritable>getAggregatedValue( + RandomWalkVertex.L1_NORM_OF_PROBABILITY_DIFFERENCE).get(); + long numDanglingVertices = + this.<LongWritable>getAggregatedValue( + RandomWalkVertex.NUM_DANGLING_VERTICES).get(); + + LOG.info("[Superstep " + getSuperstep() + "] Dangling contribution = " + + danglingContribution + ", number of dangling vertices = " + + numDanglingVertices + ", cumulative probability = " + + cumulativeProbability + ", L1 Norm of state vector difference = " + + l1NormOfStateDiff); + + // Convergence check: halt once the L1 norm of the difference between the + // state vectors fall below the threshold + if (getSuperstep() > 1 && l1NormOfStateDiff < CONVERGENCE_THRESHOLD) { + haltComputation(); + } + } + + @Override + public void initialize() throws InstantiationException, + IllegalAccessException { + registerAggregator(RandomWalkVertex.NUM_DANGLING_VERTICES, + LongSumAggregator.class); + registerAggregator(RandomWalkVertex.CUMULATIVE_DANGLING_PROBABILITY, + DoubleSumAggregator.class); + registerAggregator(RandomWalkVertex.CUMULATIVE_PROBABILITY, + DoubleSumAggregator.class); + registerAggregator(RandomWalkVertex.L1_NORM_OF_PROBABILITY_DIFFERENCE, + DoubleSumAggregator.class); + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/b77acf2e/giraph-examples/src/test/java/org/apache/giraph/examples/PageRankVertexTest.java ---------------------------------------------------------------------- diff --git a/giraph-examples/src/test/java/org/apache/giraph/examples/PageRankVertexTest.java b/giraph-examples/src/test/java/org/apache/giraph/examples/PageRankVertexTest.java index 2e4bbc4..b41bcf6 100644 --- a/giraph-examples/src/test/java/org/apache/giraph/examples/PageRankVertexTest.java +++ b/giraph-examples/src/test/java/org/apache/giraph/examples/PageRankVertexTest.java @@ -43,7 +43,6 @@ public class PageRankVertexTest { String[] graph = new String[] { "1 4 2 3", "2 1", - "3", "4 3 2", "5 2 4" }; @@ -57,8 +56,7 @@ public class PageRankVertexTest { conf.setVertexOutputFormatClass( VertexWithDoubleValueNullEdgeTextOutputFormat.class); conf.setWorkerContextClass(RandomWalkWorkerContext.class); - conf.setMasterComputeClass( - RandomWalkVertex.RandomWalkVertexMasterCompute.class); + conf.setMasterComputeClass(RandomWalkVertexMasterCompute.class); // Run internally Iterable<String> results = InternalVertexRunner.run(conf, graph); @@ -76,4 +74,5 @@ public class PageRankVertexTest { assertEquals(0.06784692727193153, steadyStateProbabilities.get(5l), RandomWalkTestUtils.EPSILON); } + } http://git-wip-us.apache.org/repos/asf/giraph/blob/b77acf2e/giraph-examples/src/test/java/org/apache/giraph/examples/RandomWalkWithRestartVertexTest.java ---------------------------------------------------------------------- diff --git a/giraph-examples/src/test/java/org/apache/giraph/examples/RandomWalkWithRestartVertexTest.java b/giraph-examples/src/test/java/org/apache/giraph/examples/RandomWalkWithRestartVertexTest.java index a3dbd45..6ecfefe 100644 --- a/giraph-examples/src/test/java/org/apache/giraph/examples/RandomWalkWithRestartVertexTest.java +++ b/giraph-examples/src/test/java/org/apache/giraph/examples/RandomWalkWithRestartVertexTest.java @@ -20,7 +20,6 @@ package org.apache.giraph.examples; import org.apache.giraph.conf.GiraphConfiguration; import org.apache.giraph.edge.ByteArrayEdges; -import org.apache.giraph.examples.RandomWalkVertex.RandomWalkVertexMasterCompute; import org.apache.giraph.utils.InternalVertexRunner; import org.junit.Test;
