Repository: giraph Updated Branches: refs/heads/trunk 51f093764 -> cf6abc09a
GIRAPH-1062: Page rank in Blocks&Pieces Summary: We have some examples of pagerank, but they all have some things missing. Make one which will take sinks into account, have convergence checks, support both weighted and unweighted graphs. Test Plan: mvn clean verify -P hadoop_facebook. We use this app internally Differential Revision: https://reviews.facebook.net/D58059 Project: http://git-wip-us.apache.org/repos/asf/giraph/repo Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/cf6abc09 Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/cf6abc09 Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/cf6abc09 Branch: refs/heads/trunk Commit: cf6abc09a3b4077646c0ec03ec3d1046acecc57a Parents: 51f0937 Author: Maja Kabiljo <[email protected]> Authored: Wed May 11 16:09:47 2016 -0700 Committer: Maja Kabiljo <[email protected]> Committed: Wed Jun 22 17:51:40 2016 -0700 ---------------------------------------------------------------------- .../library/pagerank/EdgeValueGetter.java | 54 ++++ .../library/pagerank/PageRankBlockFactory.java | 91 ++++++ .../library/pagerank/PageRankBlockUtils.java | 80 +++++ .../pagerank/PageRankConvergenceType.java | 37 +++ ...ageRankInitializeAndNormalizeEdgesPiece.java | 100 +++++++ .../library/pagerank/PageRankIteration.java | 292 ++++++++++++++++++ .../library/pagerank/PageRankSettings.java | 120 ++++++++ .../pagerank/PageRankVertexValueFactory.java | 43 +++ .../pagerank/UnweightedEdgeValueGetter.java | 51 ++++ .../library/pagerank/package-info.java | 21 ++ .../library/pagerank/PageRankTest.java | 293 +++++++++++++++++++ .../giraph/combiner/NullMessageCombiner.java | 39 +++ 12 files changed, 1221 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/giraph/blob/cf6abc09/giraph-block-app-8/src/main/java/org/apache/giraph/block_app/library/pagerank/EdgeValueGetter.java ---------------------------------------------------------------------- diff --git a/giraph-block-app-8/src/main/java/org/apache/giraph/block_app/library/pagerank/EdgeValueGetter.java b/giraph-block-app-8/src/main/java/org/apache/giraph/block_app/library/pagerank/EdgeValueGetter.java new file mode 100644 index 0000000..99aab6e --- /dev/null +++ b/giraph-block-app-8/src/main/java/org/apache/giraph/block_app/library/pagerank/EdgeValueGetter.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.block_app.library.pagerank; + +import org.apache.giraph.graph.Vertex; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; + +import java.io.Serializable; + +/** + * Edge value getter + * + * @param <I> Vertex id + * @param <V> Vertex value + * @param <E> Edge value + */ +public interface EdgeValueGetter<I extends WritableComparable, + V extends Writable, E extends Writable> + extends Serializable { + /** + * Get edge value from a vertex and its edge + * + * @param vertex Vertex + * @param edgeValue Edge value + * @return Edge value as double + */ + double getEdgeValue(Vertex<I, V, E> vertex, E edgeValue); + + /** + * Check if for one vertex all out edges have the same value + * + * @return Whether for one vertex all out edges have the same value + */ + default boolean allVertexEdgesTheSame() { + return false; + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/cf6abc09/giraph-block-app-8/src/main/java/org/apache/giraph/block_app/library/pagerank/PageRankBlockFactory.java ---------------------------------------------------------------------- diff --git a/giraph-block-app-8/src/main/java/org/apache/giraph/block_app/library/pagerank/PageRankBlockFactory.java b/giraph-block-app-8/src/main/java/org/apache/giraph/block_app/library/pagerank/PageRankBlockFactory.java new file mode 100644 index 0000000..b67ce13 --- /dev/null +++ b/giraph-block-app-8/src/main/java/org/apache/giraph/block_app/library/pagerank/PageRankBlockFactory.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.block_app.library.pagerank; + +import org.apache.giraph.block_app.framework.AbstractBlockFactory; +import org.apache.giraph.block_app.framework.block.Block; +import org.apache.giraph.comm.messages.MessageEncodeAndStoreType; +import org.apache.giraph.conf.GiraphConfiguration; +import org.apache.giraph.conf.GiraphConstants; +import org.apache.giraph.edge.LongDoubleArrayEdges; +import org.apache.giraph.edge.LongNullArrayEdges; +import org.apache.giraph.graph.Vertex; +import org.apache.hadoop.io.DoubleWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; + +/** + * Block factory for pagerank + */ +public class PageRankBlockFactory extends AbstractBlockFactory<Object> { + @Override + protected Class<? extends WritableComparable> getVertexIDClass( + GiraphConfiguration conf) { + return LongWritable.class; + } + + @Override + protected Class<? extends Writable> getVertexValueClass( + GiraphConfiguration conf) { + return DoubleWritable.class; + } + + @Override + protected Class<? extends Writable> getEdgeValueClass( + GiraphConfiguration conf) { + return PageRankSettings.isWeighted(conf) ? + DoubleWritable.class : NullWritable.class; + } + + @Override + public Block createBlock(GiraphConfiguration conf) { + if (PageRankSettings.isWeighted(conf)) { + return PageRankBlockUtils.<LongWritable, DoubleWritable>weightedPagerank( + (vertex, value) -> vertex.getValue().set(value.get()), + Vertex::getValue, + conf); + } else { + return + PageRankBlockUtils.<LongWritable, DoubleWritable>unweightedPagerank( + (vertex, value) -> vertex.getValue().set(value.get()), + Vertex::getValue, + conf); + } + } + + @Override + public Object createExecutionStage(GiraphConfiguration conf) { + return new Object(); + } + + @Override + protected void additionalInitConfig(GiraphConfiguration conf) { + conf.setVertexValueFactoryClass(PageRankVertexValueFactory.class); + if (PageRankSettings.isWeighted(conf)) { + conf.setOutEdgesClass(LongDoubleArrayEdges.class); + } else { + // Save on network traffic by only sending one message value per worker + GiraphConstants.MESSAGE_ENCODE_AND_STORE_TYPE.setIfUnset( + conf, MessageEncodeAndStoreType.EXTRACT_BYTEARRAY_PER_PARTITION); + conf.setOutEdgesClass(LongNullArrayEdges.class); + } + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/cf6abc09/giraph-block-app-8/src/main/java/org/apache/giraph/block_app/library/pagerank/PageRankBlockUtils.java ---------------------------------------------------------------------- diff --git a/giraph-block-app-8/src/main/java/org/apache/giraph/block_app/library/pagerank/PageRankBlockUtils.java b/giraph-block-app-8/src/main/java/org/apache/giraph/block_app/library/pagerank/PageRankBlockUtils.java new file mode 100644 index 0000000..a14870b --- /dev/null +++ b/giraph-block-app-8/src/main/java/org/apache/giraph/block_app/library/pagerank/PageRankBlockUtils.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.block_app.library.pagerank; + +import org.apache.giraph.block_app.framework.block.Block; +import org.apache.giraph.block_app.framework.block.RepeatUntilBlock; +import org.apache.giraph.block_app.framework.block.SequenceBlock; +import org.apache.giraph.conf.GiraphConfiguration; +import org.apache.giraph.function.ObjectTransfer; +import org.apache.giraph.function.vertex.ConsumerWithVertex; +import org.apache.giraph.function.vertex.SupplierFromVertex; +import org.apache.hadoop.io.DoubleWritable; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; + +/** + * Utility class that helps to construct page rank block + */ +public class PageRankBlockUtils { + /** Do not instantiate */ + private PageRankBlockUtils() { + } + + public static <I extends WritableComparable, V extends Writable> Block + weightedPagerank( + ConsumerWithVertex<I, V, DoubleWritable, DoubleWritable> valueSetter, + SupplierFromVertex<I, V, DoubleWritable, DoubleWritable> valueGetter, + GiraphConfiguration conf) { + return new SequenceBlock( + new PageRankInitializeAndNormalizeEdgesPiece<>(valueSetter, conf), + pagerank(valueSetter, valueGetter, + (vertex, edgeValue) -> edgeValue.get(), conf)); + } + + public static <I extends WritableComparable, V extends Writable> Block + unweightedPagerank( + ConsumerWithVertex<I, V, NullWritable, DoubleWritable> valueSetter, + SupplierFromVertex<I, V, NullWritable, DoubleWritable> valueGetter, + GiraphConfiguration conf) { + return pagerank(valueSetter, valueGetter, + (UnweightedEdgeValueGetter<I, V, NullWritable>) + vertex -> + vertex.getNumEdges() == 0 ? 0 : 1.0 / vertex.getNumEdges(), + conf); + } + + public static <I extends WritableComparable, V extends Writable, + E extends Writable> Block pagerank( + ConsumerWithVertex<I, V, E, DoubleWritable> valueSetter, + SupplierFromVertex<I, V, E, DoubleWritable> valueGetter, + EdgeValueGetter<I, V, E> edgeValueGetter, + GiraphConfiguration conf) { + ObjectTransfer<Boolean> haltCondition = new ObjectTransfer<>(); + ObjectTransfer<DoubleWritable> valueTransfer = new ObjectTransfer<>(); + return new SequenceBlock( + new RepeatUntilBlock(PageRankSettings.getIterations(conf) + 1, + new PageRankIteration<>(valueSetter, valueGetter, edgeValueGetter, + valueTransfer, haltCondition, conf), + haltCondition + ) + ); + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/cf6abc09/giraph-block-app-8/src/main/java/org/apache/giraph/block_app/library/pagerank/PageRankConvergenceType.java ---------------------------------------------------------------------- diff --git a/giraph-block-app-8/src/main/java/org/apache/giraph/block_app/library/pagerank/PageRankConvergenceType.java b/giraph-block-app-8/src/main/java/org/apache/giraph/block_app/library/pagerank/PageRankConvergenceType.java new file mode 100644 index 0000000..0d68132 --- /dev/null +++ b/giraph-block-app-8/src/main/java/org/apache/giraph/block_app/library/pagerank/PageRankConvergenceType.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.block_app.library.pagerank; + +/** + * Convergence types for the pagerank computation. + */ +public enum PageRankConvergenceType { + /** Do not check convergence */ + NO_CONVERGENCE, + /** The sum of the absolute differences */ + SUM_DIFFERENCES, + /** The max of the absolute differences */ + MAX_DIFFERENCES, + /** The sum of the relative differences */ + SUM_RELATIVE_DIFFERENCES, + /** The max of the relative differences */ + MAX_RELATIVE_DIFFERENCES, + /** RMSE difference */ + RMSE_DIFFERENCES; +} http://git-wip-us.apache.org/repos/asf/giraph/blob/cf6abc09/giraph-block-app-8/src/main/java/org/apache/giraph/block_app/library/pagerank/PageRankInitializeAndNormalizeEdgesPiece.java ---------------------------------------------------------------------- diff --git a/giraph-block-app-8/src/main/java/org/apache/giraph/block_app/library/pagerank/PageRankInitializeAndNormalizeEdgesPiece.java b/giraph-block-app-8/src/main/java/org/apache/giraph/block_app/library/pagerank/PageRankInitializeAndNormalizeEdgesPiece.java new file mode 100644 index 0000000..715deb1 --- /dev/null +++ b/giraph-block-app-8/src/main/java/org/apache/giraph/block_app/library/pagerank/PageRankInitializeAndNormalizeEdgesPiece.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.block_app.library.pagerank; + +import org.apache.giraph.block_app.framework.api.BlockWorkerReceiveApi; +import org.apache.giraph.block_app.framework.api.BlockWorkerSendApi; +import org.apache.giraph.block_app.framework.piece.Piece; +import org.apache.giraph.block_app.framework.piece.interfaces.VertexReceiver; +import org.apache.giraph.block_app.framework.piece.interfaces.VertexSender; +import org.apache.giraph.combiner.NullMessageCombiner; +import org.apache.giraph.conf.GiraphConfiguration; +import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; +import org.apache.giraph.edge.Edge; +import org.apache.giraph.edge.MutableEdge; +import org.apache.giraph.function.vertex.ConsumerWithVertex; +import org.apache.hadoop.io.DoubleWritable; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; + +/** + * Normalize outgoing edge weight. + * + * @param <I> Vertex id + * @param <V> Vertex value + */ +public class PageRankInitializeAndNormalizeEdgesPiece< + I extends WritableComparable, V extends Writable> + extends Piece<I, V, DoubleWritable, NullWritable, Object> { + /** Consumer which sets pagerank value in vertex */ + private final ConsumerWithVertex<I, V, DoubleWritable, DoubleWritable> + valueSetter; + /** Default initial value pagerank value */ + private final DoubleWritable initialValue; + + /** + * Constructor + * + * @param valueSetter Consumer which sets pagerank value in vertex + * @param conf Configuration + */ + public PageRankInitializeAndNormalizeEdgesPiece( + ConsumerWithVertex<I, V, DoubleWritable, DoubleWritable> valueSetter, + GiraphConfiguration conf) { + this.valueSetter = valueSetter; + initialValue = new DoubleWritable(PageRankSettings.getInitialValue(conf)); + } + + @Override + public VertexSender<I, V, DoubleWritable> getVertexSender( + final BlockWorkerSendApi<I, V, DoubleWritable, NullWritable> workerApi, + Object executionStage) { + final NullWritable reusableMessage = NullWritable.get(); + return vertex -> { + if (vertex.getNumEdges() > 0) { + // Normalize edge weights if vertex has out edges + double weightSum = 0.0; + for (Edge<I, DoubleWritable> edge : vertex.getEdges()) { + weightSum += edge.getValue().get(); + } + for (MutableEdge<I, DoubleWritable> edge : vertex.getMutableEdges()) { + edge.setValue(new DoubleWritable(edge.getValue().get() / weightSum)); + } + // Make sure all the vertices are created + workerApi.sendMessageToAllEdges(vertex, reusableMessage); + } + }; + } + + @Override + public VertexReceiver<I, V, DoubleWritable, NullWritable> getVertexReceiver( + BlockWorkerReceiveApi<I> workerApi, Object executionStage) { + return (vertex, messages) -> { + // Set initial pagerank value on all vertices + valueSetter.apply(vertex, initialValue); + }; + } + + @Override + protected NullMessageCombiner getMessageCombiner( + ImmutableClassesGiraphConfiguration conf) { + return new NullMessageCombiner(); + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/cf6abc09/giraph-block-app-8/src/main/java/org/apache/giraph/block_app/library/pagerank/PageRankIteration.java ---------------------------------------------------------------------- diff --git a/giraph-block-app-8/src/main/java/org/apache/giraph/block_app/library/pagerank/PageRankIteration.java b/giraph-block-app-8/src/main/java/org/apache/giraph/block_app/library/pagerank/PageRankIteration.java new file mode 100644 index 0000000..d0899f3 --- /dev/null +++ b/giraph-block-app-8/src/main/java/org/apache/giraph/block_app/library/pagerank/PageRankIteration.java @@ -0,0 +1,292 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.block_app.library.pagerank; + +import org.apache.giraph.block_app.framework.api.BlockMasterApi; +import org.apache.giraph.block_app.framework.api.BlockWorkerReceiveApi; +import org.apache.giraph.block_app.framework.api.BlockWorkerSendApi; +import org.apache.giraph.block_app.framework.api.CreateReducersApi; +import org.apache.giraph.block_app.framework.piece.Piece; +import org.apache.giraph.block_app.framework.piece.global_comm.ReducerAndBroadcastWrapperHandle; +import org.apache.giraph.block_app.framework.piece.global_comm.ReducerHandle; +import org.apache.giraph.block_app.framework.piece.interfaces.VertexReceiver; +import org.apache.giraph.block_app.framework.piece.interfaces.VertexSender; +import org.apache.giraph.combiner.MessageCombiner; +import org.apache.giraph.combiner.SumMessageCombiner; +import org.apache.giraph.conf.GiraphConfiguration; +import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; +import org.apache.giraph.edge.Edge; +import org.apache.giraph.function.Consumer; +import org.apache.giraph.function.ObjectTransfer; +import org.apache.giraph.function.vertex.ConsumerWithVertex; +import org.apache.giraph.function.vertex.SupplierFromVertex; +import org.apache.giraph.reducers.impl.MaxReduce; +import org.apache.giraph.reducers.impl.SumReduce; +import org.apache.hadoop.io.DoubleWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; +import org.apache.log4j.Logger; + +/** + * Single iteration of page rank + * + * @param <I> Vertex id + * @param <V> Vertex value + * @param <E> Edge value + */ +public class PageRankIteration<I extends WritableComparable, + V extends Writable, E extends Writable> extends + Piece<I, V, E, DoubleWritable, Object> { + /** Logger */ + private static final Logger LOG = Logger.getLogger(PageRankIteration.class); + + /** Consumer which sets pagerank value in vertex */ + private final ConsumerWithVertex<I, V, E, DoubleWritable> + valueSetter; + /** Supplier which reads pagerank value from vertex */ + private final SupplierFromVertex<I, V, E, DoubleWritable> + valueGetter; + /** Supplier which reads edge value from an edge */ + private final EdgeValueGetter<I, V, E> edgeValueGetter; + /** + * Object transfer for passing new pagerank value between consecutive + * PageRankIteration pieces + */ + private final ObjectTransfer<DoubleWritable> valueTransfer; + /** Consumer which sets halt condition based on convergence criteria */ + private final Consumer<Boolean> haltCondition; + + /** Damping factor */ + private final double dampingFactor; + /** Convergence type */ + private final PageRankConvergenceType convergenceType; + /** Convergence threshold */ + private final float convergenceThreshold; + + /** Sums the errors for each vertex */ + private ReducerHandle<DoubleWritable, DoubleWritable> superstepErrorSum; + /** Maximum of the errors for each vertex */ + private ReducerHandle<DoubleWritable, DoubleWritable> superstepErrorMax; + /** Sums the RMSE errors for each vertex */ + private ReducerHandle<DoubleWritable, DoubleWritable> superstepErrorRMSESum; + /** Sums the errors for each vertex */ + private ReducerHandle<DoubleWritable, DoubleWritable> + superstepRelativeErrorSum; + /** Maximum of the errors for each vertex */ + private ReducerHandle<DoubleWritable, DoubleWritable> + superstepRelativeErrorMax; + /** Number of modified vertices */ + private ReducerHandle<LongWritable, LongWritable> verticesModified; + + /** Sum of pagerank values for sink vertices */ + private final + ReducerAndBroadcastWrapperHandle<DoubleWritable, DoubleWritable> + superstepPageRankSinks = new ReducerAndBroadcastWrapperHandle<>(); + /** Sum of all pagerank values */ + private final + ReducerAndBroadcastWrapperHandle<DoubleWritable, DoubleWritable> + superstepPageRankAll = new ReducerAndBroadcastWrapperHandle<>(); + + /** + * Constructor + * + * @param valueSetter Consumer which sets pagerank value in vertex + * @param valueGetter Supplier which reads pagerank value from vertex + * @param edgeValueGetter Supplier which reads edge value from an edge + * @param valueTransfer Object transfer for passing new pagerank value + * between consecutive PageRankIteration pieces + * @param haltCondition Consumer which sets halt condition based on + * convergence criteria + * @param conf Configuration + */ + public PageRankIteration( + ConsumerWithVertex<I, V, E, DoubleWritable> valueSetter, + SupplierFromVertex<I, V, E, DoubleWritable> valueGetter, + EdgeValueGetter<I, V, E> edgeValueGetter, + ObjectTransfer<DoubleWritable> valueTransfer, + ObjectTransfer<Boolean> haltCondition, + GiraphConfiguration conf) { + this.valueSetter = valueSetter; + this.valueGetter = valueGetter; + this.edgeValueGetter = edgeValueGetter; + this.valueTransfer = valueTransfer; + this.haltCondition = haltCondition; + dampingFactor = PageRankSettings.getDampingFactor(conf); + convergenceType = PageRankSettings.getConvergenceType(conf); + convergenceThreshold = PageRankSettings.getConvergenceThreshold(conf); + } + + + @Override + public VertexSender<I, V, E> getVertexSender( + final BlockWorkerSendApi<I, V, E, DoubleWritable> workerApi, + Object executionStage) { + final DoubleWritable message = new DoubleWritable(); + return vertex -> { + DoubleWritable newValue = valueTransfer.get(); + // Update stats + if (newValue != null) { + DoubleWritable oldValue = valueGetter.get(vertex); + double diff = Math.abs(oldValue.get() - newValue.get()); + reduceDouble(superstepErrorSum, diff); + reduceDouble(superstepErrorMax, diff); + + reduceDouble(superstepErrorRMSESum, diff * diff); + if (oldValue.get() > 0) { + reduceDouble(superstepRelativeErrorSum, diff / oldValue.get()); + reduceDouble(superstepRelativeErrorMax, diff / oldValue.get()); + } + valueSetter.apply(vertex, newValue); + reduceLong(verticesModified, 1); + } + + // Send pagerank value to neighbors, or update sink sum + DoubleWritable value = valueGetter.get(vertex); + superstepPageRankAll.reduce(value); + if (vertex.getNumEdges() == 0) { // sink vertex + superstepPageRankSinks.reduce(value); + } else { // not a sink + if (value.get() > 0) { + if (edgeValueGetter.allVertexEdgesTheSame()) { + message.set(value.get() * + edgeValueGetter.getEdgeValue(vertex, null)); + workerApi.sendMessageToAllEdges(vertex, message); + } else { + for (Edge<I, E> edge : vertex.getEdges()) { + message.set(value.get() * + edgeValueGetter.getEdgeValue(vertex, edge.getValue())); + workerApi.sendMessage(edge.getTargetVertexId(), message); + } + } + } + } + }; + } + + @Override + public void masterCompute(BlockMasterApi masterApi, Object executionStage) { + if (LOG.isInfoEnabled()) { + LOG.info("Superstep statistics:"); + LOG.info("\t sum_error: " + superstepErrorSum.getReducedValue(masterApi)); + LOG.info("\t max_error: " + superstepErrorMax.getReducedValue(masterApi)); + LOG.info("\t relative_sum_error: " + + superstepRelativeErrorSum.getReducedValue(masterApi)); + LOG.info("\t relative_max_error: " + + superstepRelativeErrorMax.getReducedValue(masterApi)); + LOG.info("\t rmse_error: " + + superstepErrorRMSESum.getReducedValue(masterApi)); + LOG.info("\t sink_sum: " + + superstepPageRankSinks.getReducedValue(masterApi)); + LOG.info("\t all_sum: " + + superstepPageRankAll.getReducedValue(masterApi)); + } + + superstepPageRankSinks.broadcastValue(masterApi); + superstepPageRankAll.broadcastValue(masterApi); + + // For each superstep check the convergence criteria + ReducerHandle<DoubleWritable, DoubleWritable> reducerToCheckForConvergence; + switch (convergenceType) { + case SUM_DIFFERENCES: + reducerToCheckForConvergence = superstepErrorSum; + break; + case MAX_DIFFERENCES: + reducerToCheckForConvergence = superstepErrorMax; + break; + case SUM_RELATIVE_DIFFERENCES: + reducerToCheckForConvergence = superstepRelativeErrorSum; + break; + case MAX_RELATIVE_DIFFERENCES: + reducerToCheckForConvergence = superstepRelativeErrorMax; + break; + case RMSE_DIFFERENCES: + reducerToCheckForConvergence = superstepErrorRMSESum; + break; + default: + reducerToCheckForConvergence = null; + break; + } + boolean shouldHalt = reducerToCheckForConvergence != null && + reducerToCheckForConvergence.getReducedValue(masterApi).get() < + convergenceThreshold; + // If halt condition is met and it's not the first iteration + haltCondition.apply(shouldHalt && + verticesModified.getReducedValue(masterApi).get() > 0); + } + + @Override + public VertexReceiver<I, V, E, DoubleWritable> getVertexReceiver( + final BlockWorkerReceiveApi<I> workerApi, Object executionStage) { + double sinkSum = superstepPageRankSinks.getBroadcast(workerApi).get(); + double allSum = superstepPageRankAll.getBroadcast(workerApi).get(); + DoubleWritable reusableDoubleWritable = new DoubleWritable(); + return (vertex, messages) -> { + double newValue = calculateNewValue(workerApi.getTotalNumVertices(), + sinkSum, allSum, messages); + reusableDoubleWritable.set(newValue); + valueTransfer.apply(reusableDoubleWritable); + }; + } + + /** + * Calculates the new value of pagerank at a vertex + * + * @param messages Messages + * @return new value or change + */ + protected double calculateNewValue(long totalVertices, double sinkSum, + double allSum, Iterable<DoubleWritable> messages) { + double sum = 0.0; + for (DoubleWritable message : messages) { + sum += message.get(); + } + // Every vertex also receives equal fraction of pagerank value from sinks + sum += sinkSum / totalVertices; + return dampingFactor * sum + + (1.0 - dampingFactor) * allSum / totalVertices; + } + + @Override + public void registerReducers( + CreateReducersApi reduceApi, Object executionStage) { + super.registerReducers(reduceApi, executionStage); + superstepErrorSum = reduceApi.createLocalReducer(SumReduce.DOUBLE); + superstepErrorRMSESum = reduceApi.createLocalReducer(SumReduce.DOUBLE); + superstepErrorMax = reduceApi.createLocalReducer(MaxReduce.DOUBLE); + superstepRelativeErrorSum = reduceApi.createLocalReducer(SumReduce.DOUBLE); + superstepRelativeErrorMax = reduceApi.createLocalReducer(MaxReduce.DOUBLE); + verticesModified = reduceApi.createLocalReducer(SumReduce.LONG); + superstepPageRankSinks.registeredReducer( + reduceApi.createLocalReducer(SumReduce.DOUBLE)); + superstepPageRankAll.registeredReducer( + reduceApi.createLocalReducer(SumReduce.DOUBLE)); + } + + @Override + protected MessageCombiner<? super I, DoubleWritable> getMessageCombiner( + ImmutableClassesGiraphConfiguration conf) { + return SumMessageCombiner.DOUBLE; + } + + @Override + protected boolean allowOneMessageToManyIdsEncoding() { + return edgeValueGetter.allVertexEdgesTheSame(); + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/cf6abc09/giraph-block-app-8/src/main/java/org/apache/giraph/block_app/library/pagerank/PageRankSettings.java ---------------------------------------------------------------------- diff --git a/giraph-block-app-8/src/main/java/org/apache/giraph/block_app/library/pagerank/PageRankSettings.java b/giraph-block-app-8/src/main/java/org/apache/giraph/block_app/library/pagerank/PageRankSettings.java new file mode 100644 index 0000000..0fe38af --- /dev/null +++ b/giraph-block-app-8/src/main/java/org/apache/giraph/block_app/library/pagerank/PageRankSettings.java @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.block_app.library.pagerank; + +import org.apache.giraph.conf.BooleanConfOption; +import org.apache.giraph.conf.FloatConfOption; +import org.apache.giraph.conf.IntConfOption; +import org.apache.giraph.conf.StrConfOption; +import org.apache.hadoop.conf.Configuration; + +/** + * Configuration options for PageRank algorithm + */ +public class PageRankSettings { + /** Number of iterations. */ + public static final IntConfOption ITERATIONS = new IntConfOption( + "giraph.pagerank.iterations", 10, "Number of iterations"); + /** Damping factor. */ + public static final FloatConfOption DAMPING_FACTOR = new FloatConfOption( + "giraph.pagerank.dampingFactor", 0.85f, "Damping factor"); + /** Initial PageRank value. */ + public static final FloatConfOption INITIAL_VALUE = new FloatConfOption( + "giraph.pagerank.initialValue", 1.0f, "Initial value"); + /** + * Convergence criteria, default is no convergence checking. + * See {@link PageRankConvergenceType} for various types. + */ + public static final StrConfOption CONVERGENCE_TYPE = new StrConfOption( + "giraph.pagerank.convergenceType", + PageRankConvergenceType.NO_CONVERGENCE.toString(), + "Convergence criteria, default is no convergence checking"); + /** The threshold error for convergence */ + public static final FloatConfOption CONVERGENCE_THRESHOLD = + new FloatConfOption( + "giraph.pagerank.convergenceThreshold", 0.00001f, + "The threshold error for convergence"); + /** Whether we are using weighted or unweighted graph */ + public static final BooleanConfOption WEIGHTED_PAGERANK = + new BooleanConfOption("giraph.pagerank.weighted", true, + "Whether to run weighted or unweighted pagerank"); + + /** Don't construct */ + protected PageRankSettings() { } + + /** + * Get number of iterations + * + * @param conf Configuration + * @return num iterations + */ + public static int getIterations(Configuration conf) { + return ITERATIONS.get(conf); + } + + /** + * Get damping factor + * + * @param conf Configuration + * @return daping factor + */ + public static double getDampingFactor(Configuration conf) { + return DAMPING_FACTOR.get(conf); + } + + /** + * Get initial value + * + * @param conf Configuration + * @return initial value + */ + public static double getInitialValue(Configuration conf) { + return INITIAL_VALUE.get(conf); + } + + /** + * Get the type of convergence + * + * @param conf Configuration + * @return The type of convergence + */ + public static PageRankConvergenceType getConvergenceType(Configuration conf) { + return PageRankConvergenceType.valueOf(CONVERGENCE_TYPE.get(conf)); + } + + /** + * Get the convergence threshold error + * + * @param conf Configuration + * @return The convergence threshold + */ + public static float getConvergenceThreshold(Configuration conf) { + return CONVERGENCE_THRESHOLD.get(conf); + } + + /** + * Check whether to use weighted or unweighted pagerank + * + * @param conf Configuration + * @return Whether to use weighted or unweighted pagerank + */ + public static boolean isWeighted(Configuration conf) { + return WEIGHTED_PAGERANK.get(conf); + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/cf6abc09/giraph-block-app-8/src/main/java/org/apache/giraph/block_app/library/pagerank/PageRankVertexValueFactory.java ---------------------------------------------------------------------- diff --git a/giraph-block-app-8/src/main/java/org/apache/giraph/block_app/library/pagerank/PageRankVertexValueFactory.java b/giraph-block-app-8/src/main/java/org/apache/giraph/block_app/library/pagerank/PageRankVertexValueFactory.java new file mode 100644 index 0000000..4ef6b99 --- /dev/null +++ b/giraph-block-app-8/src/main/java/org/apache/giraph/block_app/library/pagerank/PageRankVertexValueFactory.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.block_app.library.pagerank; + +import org.apache.giraph.conf.GiraphConfigurationSettable; +import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; +import org.apache.giraph.factories.VertexValueFactory; +import org.apache.hadoop.io.DoubleWritable; + +/** + * Factory to create initial PageRank value + */ +public class PageRankVertexValueFactory + implements VertexValueFactory<DoubleWritable>, GiraphConfigurationSettable { + /** Cached initial value. */ + private double initialValue; + + @Override + public void setConf(ImmutableClassesGiraphConfiguration configuration) { + initialValue = PageRankSettings.getInitialValue(configuration); + } + + @Override + public DoubleWritable newInstance() { + return new DoubleWritable(initialValue); + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/cf6abc09/giraph-block-app-8/src/main/java/org/apache/giraph/block_app/library/pagerank/UnweightedEdgeValueGetter.java ---------------------------------------------------------------------- diff --git a/giraph-block-app-8/src/main/java/org/apache/giraph/block_app/library/pagerank/UnweightedEdgeValueGetter.java b/giraph-block-app-8/src/main/java/org/apache/giraph/block_app/library/pagerank/UnweightedEdgeValueGetter.java new file mode 100644 index 0000000..f22c6a2 --- /dev/null +++ b/giraph-block-app-8/src/main/java/org/apache/giraph/block_app/library/pagerank/UnweightedEdgeValueGetter.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.block_app.library.pagerank; + +import org.apache.giraph.graph.Vertex; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; + +/** + * Edge value getter where all vertex edges have the same edge value + * + * @param <I> Vertex id + * @param <V> Vertex value + * @param <E> Edge value + */ +public interface UnweightedEdgeValueGetter<I extends WritableComparable, + V extends Writable, E extends Writable> extends EdgeValueGetter<I, V, E> { + /** + * Get edge value for out edges from a vertex + * + * @param vertex Vertex + * @return Edge value as double + */ + double getEdgeValue(Vertex<I, V, E> vertex); + + @Override + default double getEdgeValue(Vertex<I, V, E> vertex, E edgeValue) { + return getEdgeValue(vertex); + } + + @Override + default boolean allVertexEdgesTheSame() { + return true; + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/cf6abc09/giraph-block-app-8/src/main/java/org/apache/giraph/block_app/library/pagerank/package-info.java ---------------------------------------------------------------------- diff --git a/giraph-block-app-8/src/main/java/org/apache/giraph/block_app/library/pagerank/package-info.java b/giraph-block-app-8/src/main/java/org/apache/giraph/block_app/library/pagerank/package-info.java new file mode 100644 index 0000000..b39fed5 --- /dev/null +++ b/giraph-block-app-8/src/main/java/org/apache/giraph/block_app/library/pagerank/package-info.java @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * Page rank implementation + */ +package org.apache.giraph.block_app.library.pagerank; http://git-wip-us.apache.org/repos/asf/giraph/blob/cf6abc09/giraph-block-app-8/src/test/java/org/apache/giraph/block_app/library/pagerank/PageRankTest.java ---------------------------------------------------------------------- diff --git a/giraph-block-app-8/src/test/java/org/apache/giraph/block_app/library/pagerank/PageRankTest.java b/giraph-block-app-8/src/test/java/org/apache/giraph/block_app/library/pagerank/PageRankTest.java new file mode 100644 index 0000000..f8ce74d --- /dev/null +++ b/giraph-block-app-8/src/test/java/org/apache/giraph/block_app/library/pagerank/PageRankTest.java @@ -0,0 +1,293 @@ +package org.apache.giraph.block_app.library.pagerank; + +import org.apache.giraph.block_app.framework.BlockUtils; +import org.apache.giraph.block_app.framework.api.local.LocalBlockRunner; +import org.apache.giraph.conf.GiraphConfiguration; +import org.apache.giraph.edge.Edge; +import org.apache.giraph.edge.EdgeFactory; +import org.apache.giraph.graph.Vertex; +import org.apache.giraph.utils.InternalVertexRunner; +import org.apache.giraph.utils.TestGraph; +import org.apache.hadoop.io.DoubleWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.NullWritable; +import org.junit.Assert; +import org.junit.Test; + +import com.google.common.collect.Lists; + +import java.util.AbstractMap; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * Pagerank test + */ +public class PageRankTest { + private static final int NUMBER_OF_ITERATIONS = 50; + private static final double PRECISION = 0.0000001; + + public static void testComputation(ExampleGenerator generator) + throws Exception { + GiraphConfiguration conf = new GiraphConfiguration(); + PageRankSettings.ITERATIONS.set(conf, NUMBER_OF_ITERATIONS); + BlockUtils.setAndInitBlockFactoryClass(conf, PageRankBlockFactory.class); + + final WeightedPageRankTestExample example = generator.generate(conf); + + LocalBlockRunner.runAppWithVertexOutput(example.graph, (vertex) -> { + Long id = vertex.getId().get(); + double expected = example.expectedOutput.get(id); + double received = vertex.getValue().get(); + Assert.assertEquals(expected, received, PRECISION); + }); + } + + @Test + public void testCliqueWeightedVertex() throws Exception { + testComputation(PageRankTest::createCliqueExample); + } + + @Test + public void testRingWeightedVertex() throws Exception { + testComputation(PageRankTest::createRingExample); + } + + @Test + public void testOneVertexConnectedToAllWeightedVertex() throws Exception { + testComputation(PageRankTest::createOneVertexConnectedToAllExample); + } + + @Test + public void testAllVerticesConnectedToOneWeightedVertex() throws Exception { + testComputation(PageRankTest::createAllVerticesConnectedToOne); + } + + @Test + public void testSmallChainWeightedVertex() throws Exception { + testComputation(PageRankTest::createSmallChainExample); + } + + @Test + public void compareWithUnweightedPageRank() throws Exception { + int numVertices = 100; + int maxEdges = 50; + float dampingFactor = 0.85f; + + GiraphConfiguration wprConf = new GiraphConfiguration(); + PageRankSettings.WEIGHTED_PAGERANK.set(wprConf, true); + PageRankSettings.ITERATIONS.set(wprConf, NUMBER_OF_ITERATIONS); + PageRankSettings.DAMPING_FACTOR.set(wprConf, dampingFactor); + BlockUtils.setAndInitBlockFactoryClass(wprConf, PageRankBlockFactory.class); + + GiraphConfiguration prConf = new GiraphConfiguration(); + PageRankSettings.WEIGHTED_PAGERANK.set(prConf, false); + PageRankSettings.ITERATIONS.set(prConf, NUMBER_OF_ITERATIONS); + PageRankSettings.DAMPING_FACTOR.set(prConf, dampingFactor); + BlockUtils.setAndInitBlockFactoryClass(prConf, PageRankBlockFactory.class); + + TestGraph<LongWritable, DoubleWritable, DoubleWritable> wprGraph = + new TestGraph<>(wprConf); + TestGraph<LongWritable, DoubleWritable, NullWritable> prGraph = + new TestGraph<>(prConf); + for (int i = 0; i < numVertices; i++) { + int[] neighbors = new int[(int) (Math.random() * maxEdges)]; + double[] edgeWeights = new double[neighbors.length]; + for (int j = 0; j < neighbors.length; j++) { + neighbors[j] = (int) (Math.random() * numVertices); + edgeWeights[j] = 1.0; + } + prGraph.addVertex(new LongWritable(i), new DoubleWritable(1.0), + createEdgesWeightless(neighbors)); + wprGraph.addVertex(new LongWritable(i), new DoubleWritable(1.0), + createEdges(neighbors, edgeWeights)); + } + + wprGraph = InternalVertexRunner.runWithInMemoryOutput(wprConf, wprGraph); + + prGraph = InternalVertexRunner.runWithInMemoryOutput(prConf, prGraph); + + for (Vertex<LongWritable, DoubleWritable, DoubleWritable> wprVertex : wprGraph) { + Vertex<LongWritable, DoubleWritable, NullWritable> prVertex = + prGraph.getVertex(wprVertex.getId()); + Assert.assertEquals(prVertex.getValue().get(), wprVertex.getValue().get(), PRECISION); + } + } + + /** + * Creates a map of weighted edges from neighbors and edgeWeights + * + * @param neighbors neighbors + * @param edgeWeights edgeWeights + * @return returns the edges + */ + private static Map.Entry<LongWritable, DoubleWritable>[] createEdges(int[] neighbors, + double[] edgeWeights) { + Map.Entry<LongWritable, DoubleWritable>[] edges = new Map.Entry[neighbors.length]; + for (int i = 0; i < neighbors.length; i++) { + edges[i] = new AbstractMap.SimpleEntry<>( + new LongWritable(neighbors[i]), new DoubleWritable(edgeWeights[i])); + } + return edges; + } + + /** + * Creates a map of unweighted edges from neighbors + * + * @param neighbors neighbors + * @return returns the edges + */ + private static Map.Entry<LongWritable, NullWritable>[] createEdgesWeightless(int[] neighbors) { + Map.Entry<LongWritable, NullWritable>[] edges = new Map.Entry[neighbors.length]; + for (int i = 0; i < neighbors.length; i++) { + edges[i] = new AbstractMap.SimpleEntry<>( + new LongWritable(neighbors[i]), NullWritable.get()); + } + return edges; + } + + + + /** + * Helper class for data related to one test case for weighted page rank. + */ + private static class WeightedPageRankTestExample { + TestGraph<LongWritable, DoubleWritable, DoubleWritable> graph; + Map<Long, Double> expectedOutput = new HashMap<>(); + } + + private interface ExampleGenerator { + WeightedPageRankTestExample generate(GiraphConfiguration conf); + } + + /** + * Create test case when graph is a clique. All outgoing edges from one + * vertex have the same weights, so they will be normalized to 1/n, + * and all vertices should have page rank 1. + */ + private static WeightedPageRankTestExample createCliqueExample(GiraphConfiguration conf) { + WeightedPageRankTestExample example = new WeightedPageRankTestExample(); + example.graph = new TestGraph<>(conf); + addVertex(1, new long[] {2, 3, 4}, new double[] {1, 1, 1}, example.graph); + addVertex(2, new long[] {1, 3, 4}, new double[] {2, 2, 2}, example.graph); + addVertex(3, new long[] {1, 2, 4}, new double[] {0.1, 0.1, 0.1}, example.graph); + addVertex(4, new long[] {1, 2, 3}, new double[] {5, 5, 5}, example.graph); + example.expectedOutput.put(1L, 1.0); + example.expectedOutput.put(2L, 1.0); + example.expectedOutput.put(3L, 1.0); + example.expectedOutput.put(4L, 1.0); + return example; + } + + + public static void addVertex(int id, long[] edges, double[] weights, + TestGraph<LongWritable, DoubleWritable, DoubleWritable> graph) { + Vertex<LongWritable, DoubleWritable, DoubleWritable> v = graph.getConf().createVertex(); + v.setConf(graph.getConf()); + v.initialize(new LongWritable(id), new DoubleWritable(), newEdges(edges, weights)); + graph.addVertex(v); + } + + private static Iterable<Edge<LongWritable, DoubleWritable>> newEdges(long[] ids, double[] weights) { + List<Edge<LongWritable, DoubleWritable>> edges = Lists.newArrayListWithCapacity(ids.length); + for (int i = 0; i < ids.length; i++) { + edges.add(EdgeFactory.create(new LongWritable(ids[i]), new DoubleWritable(weights[i]))); + } + return edges; + } + + /** + * Create test case when graph is a simple cycle. All vertices have just + * one outgoing edge, so their weights are all going to be normalized to 1, + * and all vertices should have page rank 1. + */ + private static WeightedPageRankTestExample createRingExample(GiraphConfiguration conf) { + WeightedPageRankTestExample example = new WeightedPageRankTestExample(); + example.graph = new TestGraph<>(conf); + addVertex(1, new long[] {2}, new double[] {1}, example.graph); + addVertex(2, new long[] {3}, new double[] {2}, example.graph); + addVertex(3, new long[] {4}, new double[] {1}, example.graph); + addVertex(4, new long[] {5}, new double[] {5}, example.graph); + addVertex(5, new long[] {6}, new double[] {0.7}, example.graph); + addVertex(6, new long[] {7}, new double[] {2}, example.graph); + addVertex(7, new long[] {8}, new double[] {0.3}, example.graph); + addVertex(8, new long[] {1}, new double[] {5}, example.graph); + + for (long i = 1; i <= 8; i++) { + example.expectedOutput.put(i, 1.0); + } + return example; + } + + /** + * Create test case when we have one vertex X which has outgoing edges to + * all other vertices, and all other vertices have just one outgoing + * edge to X. + * Page rank of X should be (1 + d * (n - 1)) / (d + 1), + * where d is dumping factor and n total number of vertices. + * Page rank of some other vertex Y should be 1 - d + d * pr(X) * y, + * where y is normalized weight of edge X->Y. + */ + private static WeightedPageRankTestExample + createOneVertexConnectedToAllExample(GiraphConfiguration conf) { + WeightedPageRankTestExample example = new WeightedPageRankTestExample(); + PageRankSettings.DAMPING_FACTOR.set(conf, 0.85f); + example.graph = new TestGraph<>(conf); + addVertex(1, new long[] {2, 3, 4, 5}, new double[] {1, 2, 3, 4}, example.graph); + addVertex(2, new long[] {1}, new double[] {2}, example.graph); + addVertex(3, new long[] {1}, new double[] {0.1}, example.graph); + addVertex(4, new long[] {1}, new double[] {5}, example.graph); + addVertex(5, new long[] {1}, new double[] {5}, example.graph); + // these values are obtained from eigenvector calculation in numpy + example.expectedOutput.put(1L, 2.37797072308); + example.expectedOutput.put(2L, 0.35220291338); + example.expectedOutput.put(3L, 0.55440585061); + example.expectedOutput.put(4L, 0.75660878784); + example.expectedOutput.put(5L, 0.95881172507); + return example; + } + + /** + * Create test case when we have 4 vertices, A,B,C,D, + * with edges in both directions between A and B, B and C, and C and D. + * If d is dumping factor, b weight of edge B->A and c weight of edge + * C->D, the formulas below are calculating the page rank of vertices. + */ + private static WeightedPageRankTestExample createSmallChainExample(GiraphConfiguration conf) { + WeightedPageRankTestExample example = new WeightedPageRankTestExample(); + PageRankSettings.DAMPING_FACTOR.set(conf, 0.9f); + example.graph = new TestGraph<>(conf); + addVertex(1, new long[] {2}, new double[] {3}, example.graph); + addVertex(2, new long[] {1, 3}, new double[] {3, 7}, example.graph); + addVertex(3, new long[] {2, 4}, new double[] {4, 6}, example.graph); + addVertex(4, new long[] {3}, new double[] {5}, example.graph); + // these values are obtained from eigenvector calculation in numpy + example.expectedOutput.put(1L, 0.3762585); + example.expectedOutput.put(2L, 1.0231795); + example.expectedOutput.put(3L, 1.62374149); + example.expectedOutput.put(4L, 0.97682040); + return example; + } + + /** + * Create a test with 4 vertices, 3 vertices are connected to the first + * Tests the code in presence of sinks / dangling vertices + * @return example + */ + private static WeightedPageRankTestExample createAllVerticesConnectedToOne(GiraphConfiguration conf) { + WeightedPageRankTestExample example = new WeightedPageRankTestExample(); + PageRankSettings.DAMPING_FACTOR.set(conf, 0.85f); + example.graph = new TestGraph<>(conf); + addVertex(1, new long[] {}, new double[] {}, example.graph); + addVertex(2, new long[] {1}, new double[] {3}, example.graph); + addVertex(3, new long[] {1}, new double[] {4}, example.graph); + addVertex(4, new long[] {1}, new double[] {5}, example.graph); + // these values are obtained from eigenvector calculation in numpy + example.expectedOutput.put(1L, 2.16793893); + example.expectedOutput.put(2L, 0.61068702); + example.expectedOutput.put(3L, 0.61068702); + example.expectedOutput.put(4L, 0.61068702); + return example; + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/cf6abc09/giraph-core/src/main/java/org/apache/giraph/combiner/NullMessageCombiner.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/combiner/NullMessageCombiner.java b/giraph-core/src/main/java/org/apache/giraph/combiner/NullMessageCombiner.java new file mode 100644 index 0000000..44cdd2b --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/combiner/NullMessageCombiner.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.combiner; + +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.WritableComparable; + +/** + * Keeps only one NullMessage. + */ +public class NullMessageCombiner implements + MessageCombiner<WritableComparable, NullWritable> { + + @Override + public void combine(WritableComparable vertexId, + NullWritable null1, NullWritable null2) { + } + + @Override + public NullWritable createInitialMessage() { + return NullWritable.get(); + } +}
