Repository: flink Updated Branches: refs/heads/master 438276de8 -> cb9e409b7
[FLINK-4896] [gelly] PageRank algorithm for directed graphs Adds a PageRank algorithm using Flink transformation that handles source and sink vertices (in- or out-degree of zero). The scatter-gather and gather-sum-apply PageRank implementations are moved to Gelly examples. This closes #2733 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ea14053f Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ea14053f Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ea14053f Branch: refs/heads/master Commit: ea14053fe32280ffc36e586b5d3712c751fa1f84 Parents: 438276d Author: Greg Hogan <[email protected]> Authored: Mon Oct 24 16:14:16 2016 -0400 Committer: Greg Hogan <[email protected]> Committed: Thu Mar 2 16:41:00 2017 -0500 ---------------------------------------------------------------------- docs/dev/libs/gelly/library_methods.md | 46 +- .../flink/graph/examples/GSAPageRank.java | 123 +++++ .../apache/flink/graph/examples/PageRank.java | 125 +++++ .../flink/graph/library/PageRankITCase.java | 127 ----- .../graph/test/examples/PageRankITCase.java | 129 +++++ .../apache/flink/graph/library/GSAPageRank.java | 123 ----- .../apache/flink/graph/library/PageRank.java | 125 ----- .../graph/library/link_analysis/Functions.java | 43 ++ .../flink/graph/library/link_analysis/HITS.java | 47 +- .../graph/library/link_analysis/PageRank.java | 534 +++++++++++++++++++ .../graph/library/link_analysis/HITSTest.java | 82 ++- .../library/link_analysis/PageRankTest.java | 135 +++++ 12 files changed, 1194 insertions(+), 445 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/ea14053f/docs/dev/libs/gelly/library_methods.md ---------------------------------------------------------------------- diff --git a/docs/dev/libs/gelly/library_methods.md b/docs/dev/libs/gelly/library_methods.md index e2288b4..94eee2e 100644 --- a/docs/dev/libs/gelly/library_methods.md +++ b/docs/dev/libs/gelly/library_methods.md @@ -143,29 +143,6 @@ The constructor takes one parameter: * `maxIterations`: the maximum number of iterations to run. -## PageRank - -#### Overview -An implementation of a simple [PageRank algorithm](https://en.wikipedia.org/wiki/PageRank), using [scatter-gather iterations](#scatter-gather-iterations). -PageRank is an algorithm that was first used to rank web search engine results. Today, the algorithm and many variations, are used in various graph application domains. The idea of PageRank is that important or relevant pages tend to link to other important pages. - -#### Details -The algorithm operates in iterations, where pages distribute their scores to their neighbors (pages they have links to) and subsequently update their scores based on the partial values they receive. The implementation assumes that each page has at least one incoming and one outgoing link. -In order to consider the importance of a link from one page to another, scores are divided by the total number of out-links of the source page. Thus, a page with 10 links will distribute 1/10 of its score to each neighbor, while a page with 100 links, will distribute 1/100 of its score to each neighboring page. This process computes what is often called the transition probablities, i.e. the probability that some page will lead to other page while surfing the web. To correctly compute the transition probabilities, this implementation expects the edge values to be initialised to 1.0. - -#### Usage -The algorithm takes as input a `Graph` with any vertex type, `Double` vertex values, and `Double` edge values. Edges values should be initialized to 1.0, in order to correctly compute the transition probabilities. Otherwise, the transition probability for an Edge `(u, v)` will be set to the edge value divided by `u`'s out-degree. The algorithm returns a `DataSet` of vertices, where the vertex value corresponds to assigned rank after convergence (or maximum iterations). -The constructors take the following parameters: - -* `beta`: the damping factor. -* `maxIterations`: the maximum number of iterations to run. - -## GSA PageRank - -The algorithm is implemented using [gather-sum-apply iterations](#gather-sum-apply-iterations). - -See the [PageRank](#pagerank) library method for implementation details and usage information. - ## Single Source Shortest Paths #### Overview @@ -353,6 +330,29 @@ The algorithm takes a directed graph as input and outputs a `DataSet` of `Tuple3 and authority score. Termination is configured with a maximum number of iterations and/or a convergence threshold on the sum of the change in each score for each vertex between iterations. +* `setParallelism`: override the operator parallelism + +### PageRank + +#### Overview +[PageRank](https://en.wikipedia.org/wiki/PageRank) is an algorithm that was first used to rank web search engine +results. Today, the algorithm and many variations are used in various graph application domains. The idea of PageRank is +that important or relevant vertices tend to link to other important vertices. + +#### Details +The algorithm operates in iterations, where pages distribute their scores to their neighbors (pages they have links to) +and subsequently update their scores based on the sum of values they receive. In order to consider the importance of a +link from one page to another, scores are divided by the total number of out-links of the source page. Thus, a page with +10 links will distribute 1/10 of its score to each neighbor, while a page with 100 links will distribute 1/100 of its +score to each neighboring page. + +#### Usage +The algorithm takes a directed graph as input and outputs a `DataSet` where each `Result` contains the vertex ID and +PageRank score. Termination is configured with a maximum number of iterations and/or a convergence threshold +on the sum of the change in score for each vertex between iterations. + +* `setParallelism`: override the operator parallelism + ## Metric ### Vertex Metrics http://git-wip-us.apache.org/repos/asf/flink/blob/ea14053f/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/GSAPageRank.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/GSAPageRank.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/GSAPageRank.java new file mode 100644 index 0000000..db2e4f2 --- /dev/null +++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/GSAPageRank.java @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.examples; + +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.graph.EdgeJoinFunction; +import org.apache.flink.graph.Graph; +import org.apache.flink.graph.GraphAlgorithm; +import org.apache.flink.graph.Vertex; +import org.apache.flink.graph.gsa.ApplyFunction; +import org.apache.flink.graph.gsa.GSAConfiguration; +import org.apache.flink.graph.gsa.GatherFunction; +import org.apache.flink.graph.gsa.Neighbor; +import org.apache.flink.graph.gsa.SumFunction; +import org.apache.flink.types.LongValue; + +/** + * This is an implementation of a simple PageRank algorithm, using a gather-sum-apply iteration. + * The user can define the damping factor and the maximum number of iterations. + * + * The implementation assumes that each page has at least one incoming and one outgoing link. + */ +public class GSAPageRank<K> implements GraphAlgorithm<K, Double, Double, DataSet<Vertex<K, Double>>> { + + private double beta; + private int maxIterations; + + /** + * Creates an instance of the GSA PageRank algorithm. + * + * The implementation assumes that each page has at least one incoming and one outgoing link. + * + * @param beta the damping factor + * @param maxIterations the maximum number of iterations + */ + public GSAPageRank(double beta, int maxIterations) { + this.beta = beta; + this.maxIterations = maxIterations; + } + + @Override + public DataSet<Vertex<K, Double>> run(Graph<K, Double, Double> network) throws Exception { + DataSet<Tuple2<K, LongValue>> vertexOutDegrees = network.outDegrees(); + + Graph<K, Double, Double> networkWithWeights = network + .joinWithEdgesOnSource(vertexOutDegrees, new InitWeights()); + + GSAConfiguration parameters = new GSAConfiguration(); + parameters.setOptNumVertices(true); + + return networkWithWeights.runGatherSumApplyIteration(new GatherRanks(), new SumRanks(), + new UpdateRanks<K>(beta), maxIterations, parameters) + .getVertices(); + } + + // -------------------------------------------------------------------------------------------- + // Page Rank UDFs + // -------------------------------------------------------------------------------------------- + + @SuppressWarnings("serial") + private static final class GatherRanks extends GatherFunction<Double, Double, Double> { + + @Override + public Double gather(Neighbor<Double, Double> neighbor) { + double neighborRank = neighbor.getNeighborValue(); + + if(getSuperstepNumber() == 1) { + neighborRank = 1.0 / this.getNumberOfVertices(); + } + + return neighborRank * neighbor.getEdgeValue(); + } + } + + @SuppressWarnings("serial") + private static final class SumRanks extends SumFunction<Double, Double, Double> { + + @Override + public Double sum(Double newValue, Double currentValue) { + return newValue + currentValue; + } + } + + @SuppressWarnings("serial") + private static final class UpdateRanks<K> extends ApplyFunction<K, Double, Double> { + + private final double beta; + + public UpdateRanks(double beta) { + this.beta = beta; + } + + @Override + public void apply(Double rankSum, Double currentValue) { + setResult((1-beta)/this.getNumberOfVertices() + beta * rankSum); + } + } + + @SuppressWarnings("serial") + private static final class InitWeights implements EdgeJoinFunction<Double, LongValue> { + + public Double edgeJoin(Double edgeValue, LongValue inputValue) { + return edgeValue / (double) inputValue.getValue(); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/ea14053f/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/PageRank.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/PageRank.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/PageRank.java new file mode 100644 index 0000000..6be8116 --- /dev/null +++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/PageRank.java @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.examples; + +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.graph.Edge; +import org.apache.flink.graph.EdgeJoinFunction; +import org.apache.flink.graph.Graph; +import org.apache.flink.graph.GraphAlgorithm; +import org.apache.flink.graph.Vertex; +import org.apache.flink.graph.spargel.GatherFunction; +import org.apache.flink.graph.spargel.MessageIterator; +import org.apache.flink.graph.spargel.ScatterFunction; +import org.apache.flink.graph.spargel.ScatterGatherConfiguration; +import org.apache.flink.types.LongValue; + +/** + * This is an implementation of a simple PageRank algorithm, using a scatter-gather iteration. + * The user can define the damping factor and the maximum number of iterations. + * + * The implementation assumes that each page has at least one incoming and one outgoing link. + */ +public class PageRank<K> implements GraphAlgorithm<K, Double, Double, DataSet<Vertex<K, Double>>> { + + private double beta; + private int maxIterations; + + /** + * Creates an instance of the PageRank algorithm. + * + * The implementation assumes that each page has at least one incoming and one outgoing link. + * + * @param beta the damping factor + * @param maxIterations the maximum number of iterations + */ + public PageRank(double beta, int maxIterations) { + this.beta = beta; + this.maxIterations = maxIterations; + } + + @Override + public DataSet<Vertex<K, Double>> run(Graph<K, Double, Double> network) throws Exception { + DataSet<Tuple2<K, LongValue>> vertexOutDegrees = network.outDegrees(); + + Graph<K, Double, Double> networkWithWeights = network + .joinWithEdgesOnSource(vertexOutDegrees, new InitWeights()); + + ScatterGatherConfiguration parameters = new ScatterGatherConfiguration(); + parameters.setOptNumVertices(true); + + return networkWithWeights.runScatterGatherIteration(new RankMessenger<K>(), + new VertexRankUpdater<K>(beta), maxIterations, parameters) + .getVertices(); + } + + /** + * Distributes the rank of a vertex among all target vertices according to + * the transition probability, which is associated with an edge as the edge + * value. + */ + @SuppressWarnings("serial") + public static final class RankMessenger<K> extends ScatterFunction<K, Double, Double, Double> { + @Override + public void sendMessages(Vertex<K, Double> vertex) { + if (getSuperstepNumber() == 1) { + // initialize vertex ranks + vertex.setValue(1.0 / this.getNumberOfVertices()); + } + + for (Edge<K, Double> edge : getEdges()) { + sendMessageTo(edge.getTarget(), vertex.getValue() * edge.getValue()); + } + } + } + + /** + * Function that updates the rank of a vertex by summing up the partial + * ranks from all incoming messages and then applying the dampening formula. + */ + @SuppressWarnings("serial") + public static final class VertexRankUpdater<K> extends GatherFunction<K, Double, Double> { + private final double beta; + + public VertexRankUpdater(double beta) { + this.beta = beta; + } + + @Override + public void updateVertex(Vertex<K, Double> vertex, MessageIterator<Double> inMessages) { + double rankSum = 0.0; + for (double msg : inMessages) { + rankSum += msg; + } + + // apply the dampening factor / random jump + double newRank = (beta * rankSum) + (1 - beta) / this.getNumberOfVertices(); + setNewVertexValue(newRank); + } + } + + @SuppressWarnings("serial") + private static final class InitWeights implements EdgeJoinFunction<Double, LongValue> { + public Double edgeJoin(Double edgeValue, LongValue inputValue) { + return edgeValue / (double) inputValue.getValue(); + } + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/ea14053f/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/library/PageRankITCase.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/library/PageRankITCase.java b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/library/PageRankITCase.java deleted file mode 100644 index 25a3e3f..0000000 --- a/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/library/PageRankITCase.java +++ /dev/null @@ -1,127 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.graph.library; - -import org.apache.flink.api.common.functions.MapFunction; -import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.graph.Graph; -import org.apache.flink.graph.Vertex; -import org.apache.flink.graph.examples.data.PageRankData; -import org.apache.flink.test.util.MultipleProgramsTestBase; -import org.junit.Assert; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; - -import java.util.Arrays; -import java.util.List; - -@RunWith(Parameterized.class) -public class PageRankITCase extends MultipleProgramsTestBase { - - public PageRankITCase(TestExecutionMode mode) { - super(mode); - } - - @Test - public void testPageRankWithThreeIterations() throws Exception { - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - Graph<Long, Double, Double> inputGraph = Graph.fromDataSet( - PageRankData.getDefaultEdgeDataSet(env), new InitMapper(), env); - - List<Vertex<Long, Double>> result = inputGraph.run(new PageRank<Long>(0.85, 3)) - .collect(); - - compareWithDelta(result, 0.01); - } - - @Test - public void testGSAPageRankWithThreeIterations() throws Exception { - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - Graph<Long, Double, Double> inputGraph = Graph.fromDataSet( - PageRankData.getDefaultEdgeDataSet(env), new InitMapper(), env); - - List<Vertex<Long, Double>> result = inputGraph.run(new GSAPageRank<Long>(0.85, 3)) - .collect(); - - compareWithDelta(result, 0.01); - } - - @Test - public void testPageRankWithThreeIterationsAndNumOfVertices() throws Exception { - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - Graph<Long, Double, Double> inputGraph = Graph.fromDataSet( - PageRankData.getDefaultEdgeDataSet(env), new InitMapper(), env); - - List<Vertex<Long, Double>> result = inputGraph.run(new PageRank<Long>(0.85, 3)) - .collect(); - - compareWithDelta(result, 0.01); - } - - @Test - public void testGSAPageRankWithThreeIterationsAndNumOfVertices() throws Exception { - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - Graph<Long, Double, Double> inputGraph = Graph.fromDataSet( - PageRankData.getDefaultEdgeDataSet(env), new InitMapper(), env); - - List<Vertex<Long, Double>> result = inputGraph.run(new GSAPageRank<Long>(0.85, 3)) - .collect(); - - compareWithDelta(result, 0.01); - } - - private void compareWithDelta(List<Vertex<Long, Double>> result, double delta) { - - String resultString = ""; - for (Vertex<Long, Double> v : result) { - resultString += v.f0.toString() + "," + v.f1.toString() + "\n"; - } - - String expectedResult = PageRankData.RANKS_AFTER_3_ITERATIONS; - String[] expected = expectedResult.isEmpty() ? new String[0] : expectedResult.split("\n"); - - String[] resultArray = resultString.isEmpty() ? new String[0] : resultString.split("\n"); - - Arrays.sort(expected); - Arrays.sort(resultArray); - - for (int i = 0; i < expected.length; i++) { - String[] expectedFields = expected[i].split(","); - String[] resultFields = resultArray[i].split(","); - - double expectedPayLoad = Double.parseDouble(expectedFields[1]); - double resultPayLoad = Double.parseDouble(resultFields[1]); - - Assert.assertTrue("Values differ by more than the permissible delta", - Math.abs(expectedPayLoad - resultPayLoad) < delta); - } - } - - @SuppressWarnings("serial") - private static final class InitMapper implements MapFunction<Long, Double> { - public Double map(Long value) { - return 1.0; - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/ea14053f/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/test/examples/PageRankITCase.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/test/examples/PageRankITCase.java b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/test/examples/PageRankITCase.java new file mode 100644 index 0000000..41f9a0f --- /dev/null +++ b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/test/examples/PageRankITCase.java @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.test.examples; + +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.graph.Graph; +import org.apache.flink.graph.Vertex; +import org.apache.flink.graph.examples.GSAPageRank; +import org.apache.flink.graph.examples.PageRank; +import org.apache.flink.graph.examples.data.PageRankData; +import org.apache.flink.test.util.MultipleProgramsTestBase; +import org.junit.Assert; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.util.Arrays; +import java.util.List; + +@RunWith(Parameterized.class) +public class PageRankITCase extends MultipleProgramsTestBase { + + public PageRankITCase(TestExecutionMode mode) { + super(mode); + } + + @Test + public void testPageRankWithThreeIterations() throws Exception { + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + Graph<Long, Double, Double> inputGraph = Graph.fromDataSet( + PageRankData.getDefaultEdgeDataSet(env), new InitMapper(), env); + + List<Vertex<Long, Double>> result = inputGraph.run(new PageRank<Long>(0.85, 3)) + .collect(); + + compareWithDelta(result, 0.01); + } + + @Test + public void testGSAPageRankWithThreeIterations() throws Exception { + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + Graph<Long, Double, Double> inputGraph = Graph.fromDataSet( + PageRankData.getDefaultEdgeDataSet(env), new InitMapper(), env); + + List<Vertex<Long, Double>> result = inputGraph.run(new GSAPageRank<Long>(0.85, 3)) + .collect(); + + compareWithDelta(result, 0.01); + } + + @Test + public void testPageRankWithThreeIterationsAndNumOfVertices() throws Exception { + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + Graph<Long, Double, Double> inputGraph = Graph.fromDataSet( + PageRankData.getDefaultEdgeDataSet(env), new InitMapper(), env); + + List<Vertex<Long, Double>> result = inputGraph.run(new PageRank<Long>(0.85, 3)) + .collect(); + + compareWithDelta(result, 0.01); + } + + @Test + public void testGSAPageRankWithThreeIterationsAndNumOfVertices() throws Exception { + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + Graph<Long, Double, Double> inputGraph = Graph.fromDataSet( + PageRankData.getDefaultEdgeDataSet(env), new InitMapper(), env); + + List<Vertex<Long, Double>> result = inputGraph.run(new GSAPageRank<Long>(0.85, 3)) + .collect(); + + compareWithDelta(result, 0.01); + } + + private void compareWithDelta(List<Vertex<Long, Double>> result, double delta) { + + String resultString = ""; + for (Vertex<Long, Double> v : result) { + resultString += v.f0.toString() + "," + v.f1.toString() + "\n"; + } + + String expectedResult = PageRankData.RANKS_AFTER_3_ITERATIONS; + String[] expected = expectedResult.isEmpty() ? new String[0] : expectedResult.split("\n"); + + String[] resultArray = resultString.isEmpty() ? new String[0] : resultString.split("\n"); + + Arrays.sort(expected); + Arrays.sort(resultArray); + + for (int i = 0; i < expected.length; i++) { + String[] expectedFields = expected[i].split(","); + String[] resultFields = resultArray[i].split(","); + + double expectedPayLoad = Double.parseDouble(expectedFields[1]); + double resultPayLoad = Double.parseDouble(resultFields[1]); + + Assert.assertTrue("Values differ by more than the permissible delta", + Math.abs(expectedPayLoad - resultPayLoad) < delta); + } + } + + @SuppressWarnings("serial") + private static final class InitMapper implements MapFunction<Long, Double> { + public Double map(Long value) { + return 1.0; + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/ea14053f/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/GSAPageRank.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/GSAPageRank.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/GSAPageRank.java deleted file mode 100644 index ef39395..0000000 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/GSAPageRank.java +++ /dev/null @@ -1,123 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.graph.library; - -import org.apache.flink.api.java.DataSet; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.graph.EdgeJoinFunction; -import org.apache.flink.graph.Graph; -import org.apache.flink.graph.GraphAlgorithm; -import org.apache.flink.graph.Vertex; -import org.apache.flink.graph.gsa.ApplyFunction; -import org.apache.flink.graph.gsa.GSAConfiguration; -import org.apache.flink.graph.gsa.GatherFunction; -import org.apache.flink.graph.gsa.Neighbor; -import org.apache.flink.graph.gsa.SumFunction; -import org.apache.flink.types.LongValue; - -/** - * This is an implementation of a simple PageRank algorithm, using a gather-sum-apply iteration. - * The user can define the damping factor and the maximum number of iterations. - * - * The implementation assumes that each page has at least one incoming and one outgoing link. - */ -public class GSAPageRank<K> implements GraphAlgorithm<K, Double, Double, DataSet<Vertex<K, Double>>> { - - private double beta; - private int maxIterations; - - /** - * Creates an instance of the GSA PageRank algorithm. - * - * The implementation assumes that each page has at least one incoming and one outgoing link. - * - * @param beta the damping factor - * @param maxIterations the maximum number of iterations - */ - public GSAPageRank(double beta, int maxIterations) { - this.beta = beta; - this.maxIterations = maxIterations; - } - - @Override - public DataSet<Vertex<K, Double>> run(Graph<K, Double, Double> network) throws Exception { - DataSet<Tuple2<K, LongValue>> vertexOutDegrees = network.outDegrees(); - - Graph<K, Double, Double> networkWithWeights = network - .joinWithEdgesOnSource(vertexOutDegrees, new InitWeights()); - - GSAConfiguration parameters = new GSAConfiguration(); - parameters.setOptNumVertices(true); - - return networkWithWeights.runGatherSumApplyIteration(new GatherRanks(), new SumRanks(), - new UpdateRanks<K>(beta), maxIterations, parameters) - .getVertices(); - } - - // -------------------------------------------------------------------------------------------- - // Page Rank UDFs - // -------------------------------------------------------------------------------------------- - - @SuppressWarnings("serial") - private static final class GatherRanks extends GatherFunction<Double, Double, Double> { - - @Override - public Double gather(Neighbor<Double, Double> neighbor) { - double neighborRank = neighbor.getNeighborValue(); - - if(getSuperstepNumber() == 1) { - neighborRank = 1.0 / this.getNumberOfVertices(); - } - - return neighborRank * neighbor.getEdgeValue(); - } - } - - @SuppressWarnings("serial") - private static final class SumRanks extends SumFunction<Double, Double, Double> { - - @Override - public Double sum(Double newValue, Double currentValue) { - return newValue + currentValue; - } - } - - @SuppressWarnings("serial") - private static final class UpdateRanks<K> extends ApplyFunction<K, Double, Double> { - - private final double beta; - - public UpdateRanks(double beta) { - this.beta = beta; - } - - @Override - public void apply(Double rankSum, Double currentValue) { - setResult((1-beta)/this.getNumberOfVertices() + beta * rankSum); - } - } - - @SuppressWarnings("serial") - private static final class InitWeights implements EdgeJoinFunction<Double, LongValue> { - - public Double edgeJoin(Double edgeValue, LongValue inputValue) { - return edgeValue / (double) inputValue.getValue(); - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/ea14053f/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/PageRank.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/PageRank.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/PageRank.java deleted file mode 100644 index bf9b4e9..0000000 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/PageRank.java +++ /dev/null @@ -1,125 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.graph.library; - -import org.apache.flink.api.java.DataSet; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.graph.Edge; -import org.apache.flink.graph.EdgeJoinFunction; -import org.apache.flink.graph.Graph; -import org.apache.flink.graph.GraphAlgorithm; -import org.apache.flink.graph.Vertex; -import org.apache.flink.graph.spargel.GatherFunction; -import org.apache.flink.graph.spargel.MessageIterator; -import org.apache.flink.graph.spargel.ScatterFunction; -import org.apache.flink.graph.spargel.ScatterGatherConfiguration; -import org.apache.flink.types.LongValue; - -/** - * This is an implementation of a simple PageRank algorithm, using a scatter-gather iteration. - * The user can define the damping factor and the maximum number of iterations. - * - * The implementation assumes that each page has at least one incoming and one outgoing link. - */ -public class PageRank<K> implements GraphAlgorithm<K, Double, Double, DataSet<Vertex<K, Double>>> { - - private double beta; - private int maxIterations; - - /** - * Creates an instance of the PageRank algorithm. - * - * The implementation assumes that each page has at least one incoming and one outgoing link. - * - * @param beta the damping factor - * @param maxIterations the maximum number of iterations - */ - public PageRank(double beta, int maxIterations) { - this.beta = beta; - this.maxIterations = maxIterations; - } - - @Override - public DataSet<Vertex<K, Double>> run(Graph<K, Double, Double> network) throws Exception { - DataSet<Tuple2<K, LongValue>> vertexOutDegrees = network.outDegrees(); - - Graph<K, Double, Double> networkWithWeights = network - .joinWithEdgesOnSource(vertexOutDegrees, new InitWeights()); - - ScatterGatherConfiguration parameters = new ScatterGatherConfiguration(); - parameters.setOptNumVertices(true); - - return networkWithWeights.runScatterGatherIteration(new RankMessenger<K>(), - new VertexRankUpdater<K>(beta), maxIterations, parameters) - .getVertices(); - } - - /** - * Distributes the rank of a vertex among all target vertices according to - * the transition probability, which is associated with an edge as the edge - * value. - */ - @SuppressWarnings("serial") - public static final class RankMessenger<K> extends ScatterFunction<K, Double, Double, Double> { - @Override - public void sendMessages(Vertex<K, Double> vertex) { - if (getSuperstepNumber() == 1) { - // initialize vertex ranks - vertex.setValue(1.0 / this.getNumberOfVertices()); - } - - for (Edge<K, Double> edge : getEdges()) { - sendMessageTo(edge.getTarget(), vertex.getValue() * edge.getValue()); - } - } - } - - /** - * Function that updates the rank of a vertex by summing up the partial - * ranks from all incoming messages and then applying the dampening formula. - */ - @SuppressWarnings("serial") - public static final class VertexRankUpdater<K> extends GatherFunction<K, Double, Double> { - private final double beta; - - public VertexRankUpdater(double beta) { - this.beta = beta; - } - - @Override - public void updateVertex(Vertex<K, Double> vertex, MessageIterator<Double> inMessages) { - double rankSum = 0.0; - for (double msg : inMessages) { - rankSum += msg; - } - - // apply the dampening factor / random jump - double newRank = (beta * rankSum) + (1 - beta) / this.getNumberOfVertices(); - setNewVertexValue(newRank); - } - } - - @SuppressWarnings("serial") - private static final class InitWeights implements EdgeJoinFunction<Double, LongValue> { - public Double edgeJoin(Double edgeValue, LongValue inputValue) { - return edgeValue / (double) inputValue.getValue(); - } - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/ea14053f/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/link_analysis/Functions.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/link_analysis/Functions.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/link_analysis/Functions.java new file mode 100644 index 0000000..5bb2f4c --- /dev/null +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/link_analysis/Functions.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.library.link_analysis; + +import org.apache.flink.api.common.functions.ReduceFunction; +import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.types.DoubleValue; + +class Functions { + + /** + * Sum vertices' scores. + * + * @param <T> ID type + */ + @ForwardedFields("0") + static class SumScore<T> + implements ReduceFunction<Tuple2<T, DoubleValue>> { + @Override + public Tuple2<T, DoubleValue> reduce(Tuple2<T, DoubleValue> left, Tuple2<T, DoubleValue> right) + throws Exception { + left.f1.setValue(left.f1.getValue() + right.f1.getValue()); + return left; + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/ea14053f/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/link_analysis/HITS.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/link_analysis/HITS.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/link_analysis/HITS.java index 1be55f0..ecc1ad7 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/link_analysis/HITS.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/link_analysis/HITS.java @@ -21,7 +21,6 @@ package org.apache.flink.graph.library.link_analysis; import org.apache.flink.api.common.aggregators.ConvergenceCriterion; import org.apache.flink.api.common.aggregators.DoubleSumAggregator; import org.apache.flink.api.common.functions.CoGroupFunction; -import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.api.common.functions.RichJoinFunction; @@ -38,6 +37,7 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.graph.Edge; import org.apache.flink.graph.Graph; import org.apache.flink.graph.Vertex; +import org.apache.flink.graph.library.link_analysis.Functions.SumScore; import org.apache.flink.graph.library.link_analysis.HITS.Result; import org.apache.flink.graph.utils.Murmur3_32; import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingDataSet; @@ -50,8 +50,6 @@ import java.util.Collection; import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT; /** - * http://www.cs.cornell.edu/home/kleinber/auth.pdf - * * Hyperlink-Induced Topic Search computes two interdependent scores for every * vertex in a directed graph. A good "hub" links to good "authorities" and * good "authorities" are linked from good "hubs". @@ -59,6 +57,8 @@ import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT; * This algorithm can be configured to terminate either by a limit on the number * of iterations, a convergence threshold, or both. * + * http://www.cs.cornell.edu/home/kleinber/auth.pdf + * * @param <K> graph ID type * @param <VV> vertex value type * @param <EV> edge value type @@ -91,7 +91,7 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> { /** * Hyperlink-Induced Topic Search with a convergence threshold. The algorithm - * terminates When the total change in hub and authority scores over all + * terminates when the total change in hub and authority scores over all * vertices falls to or below the given threshold value. * * @param convergenceThreshold convergence threshold for sum of scores @@ -154,13 +154,12 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> { return true; } - @Override public DataSet<Result<K>> runInternal(Graph<K, VV, EV> input) throws Exception { DataSet<Tuple2<K, K>> edges = input .getEdges() - .flatMap(new ExtractEdgeIDs<K, EV>()) + .map(new ExtractEdgeIDs<K, EV>()) .setParallelism(parallelism) .name("Extract edge IDs"); @@ -270,15 +269,15 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> { */ @ForwardedFields("0; 1") private static class ExtractEdgeIDs<T, ET> - implements FlatMapFunction<Edge<T, ET>, Tuple2<T, T>> { + implements MapFunction<Edge<T, ET>, Tuple2<T, T>> { private Tuple2<T, T> output = new Tuple2<>(); @Override - public void flatMap(Edge<T, ET> value, Collector<Tuple2<T, T>> out) + public Tuple2<T, T> map(Edge<T, ET> value) throws Exception { output.f0 = value.f0; output.f1 = value.f1; - out.collect(output); + return output; } } @@ -308,8 +307,7 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> { * * @param <T> ID type */ - @ForwardedFieldsFirst("0") - @ForwardedFieldsSecond("0") + @ForwardedFields("0") private static class SumScores<T> implements ReduceFunction<Tuple3<T, DoubleValue, DoubleValue>> { @Override @@ -345,23 +343,6 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> { } /** - * Sum vertices' scores. - * - * @param <T> ID type - */ - @ForwardedFieldsFirst("0") - @ForwardedFieldsSecond("0") - private static class SumScore<T> - implements ReduceFunction<Tuple2<T, DoubleValue>> { - @Override - public Tuple2<T, DoubleValue> reduce(Tuple2<T, DoubleValue> left, Tuple2<T, DoubleValue> right) - throws Exception { - left.f1.setValue(left.f1.getValue() + right.f1.getValue()); - return left; - } - } - - /** * The authority score is the sum of hub scores of vertices on in-edges. * * @param <T> ID type @@ -469,7 +450,8 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> { private double changeInScores; @Override - public void open(Configuration parameters) throws Exception { + public void open(Configuration parameters) + throws Exception { super.open(parameters); isInitialSuperstep = (getIterationRuntimeContext().getSuperstepNumber() == 1); @@ -477,7 +459,8 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> { } @Override - public void close() throws Exception { + public void close() + throws Exception { super.close(); DoubleSumAggregator agg = getIterationRuntimeContext().getIterationAggregator(CHANGE_IN_SCORES); @@ -498,8 +481,8 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> { /** * Monitors the total change in hub and authority scores over all vertices. - * The iteration terminates when the change in scores compared against the - * prior iteration falls below the given convergence threshold. + * The algorithm terminates when the change in scores compared against the + * prior iteration falls to or below the given convergence threshold. * * An optimization of this implementation of HITS is to leave the initial * scores non-normalized; therefore, the change in scores after the first http://git-wip-us.apache.org/repos/asf/flink/blob/ea14053f/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/link_analysis/PageRank.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/link_analysis/PageRank.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/link_analysis/PageRank.java new file mode 100644 index 0000000..514fd4e --- /dev/null +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/link_analysis/PageRank.java @@ -0,0 +1,534 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.library.link_analysis; + +import org.apache.flink.api.common.aggregators.ConvergenceCriterion; +import org.apache.flink.api.common.aggregators.DoubleSumAggregator; +import org.apache.flink.api.common.functions.CoGroupFunction; +import org.apache.flink.api.common.functions.FlatMapFunction; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.common.functions.ReduceFunction; +import org.apache.flink.api.common.functions.RichJoinFunction; +import org.apache.flink.api.common.functions.RichMapFunction; +import org.apache.flink.api.common.operators.base.ReduceOperatorBase.CombineHint; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields; +import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsFirst; +import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsSecond; +import org.apache.flink.api.java.operators.IterativeDataSet; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.graph.Edge; +import org.apache.flink.graph.Graph; +import org.apache.flink.graph.Vertex; +import org.apache.flink.graph.asm.degree.annotate.directed.EdgeSourceDegrees; +import org.apache.flink.graph.asm.degree.annotate.directed.VertexDegrees; +import org.apache.flink.graph.asm.degree.annotate.directed.VertexDegrees.Degrees; +import org.apache.flink.graph.library.link_analysis.Functions.SumScore; +import org.apache.flink.graph.library.link_analysis.PageRank.Result; +import org.apache.flink.graph.utils.GraphUtils; +import org.apache.flink.graph.utils.Murmur3_32; +import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingDataSet; +import org.apache.flink.types.DoubleValue; +import org.apache.flink.types.LongValue; +import org.apache.flink.util.Collector; +import org.apache.flink.util.Preconditions; + +import java.util.Collection; +import java.util.Iterator; + +import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT; + +/** + * PageRank computes a per-vertex score which is the sum of PageRank scores + * transmitted over in-edges. Each vertex's score is divided evenly among + * out-edges. High-scoring vertices are linked to by other high-scoring + * vertices; this is similar to the 'authority' score in {@link HITS}. + * + * http://ilpubs.stanford.edu:8090/422/1/1999-66.pdf + * + * @param <K> graph ID type + * @param <VV> vertex value type + * @param <EV> edge value type + */ +public class PageRank<K, VV, EV> +extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> { + + private static final String VERTEX_COUNT = "vertex count"; + + private static final String SUM_OF_SCORES = "sum of scores"; + + private static final String CHANGE_IN_SCORES = "change in scores"; + + // Required configuration + private final double dampingFactor; + + private int maxIterations; + + private double convergenceThreshold; + + // Optional configuration + private int parallelism = PARALLELISM_DEFAULT; + + /** + * PageRank with a fixed number of iterations. + * + * @param dampingFactor probability of following an out-link, otherwise jump to a random vertex + * @param iterations fixed number of iterations + */ + public PageRank(double dampingFactor, int iterations) { + this(dampingFactor, iterations, Double.MAX_VALUE); + } + + /** + * PageRank with a convergence threshold. The algorithm terminates when the + * change in score over all vertices falls to or below the given threshold value. + * + * @param dampingFactor probability of following an out-link, otherwise jump to a random vertex + * @param convergenceThreshold convergence threshold for sum of scores + */ + public PageRank(double dampingFactor, double convergenceThreshold) { + this(dampingFactor, Integer.MAX_VALUE, convergenceThreshold); + } + + /** + * PageRank with a convergence threshold and a maximum iteration count. The + * algorithm terminates after either the given number of iterations or when + * the change in score over all vertices falls to or below the given + * threshold value. + * + * @param dampingFactor probability of following an out-link, otherwise jump to a random vertex + * @param maxIterations maximum number of iterations + * @param convergenceThreshold convergence threshold for sum of scores + */ + public PageRank(double dampingFactor, int maxIterations, double convergenceThreshold) { + Preconditions.checkArgument(0 < dampingFactor && dampingFactor < 1, + "Damping factor must be between zero and one"); + Preconditions.checkArgument(maxIterations > 0, "Number of iterations must be greater than zero"); + Preconditions.checkArgument(convergenceThreshold > 0.0, "Convergence threshold must be greater than zero"); + + this.dampingFactor = dampingFactor; + this.maxIterations = maxIterations; + this.convergenceThreshold = convergenceThreshold; + } + + /** + * Override the operator parallelism. + * + * @param parallelism operator parallelism + * @return this + */ + public PageRank<K, VV, EV> setParallelism(int parallelism) { + this.parallelism = parallelism; + + return this; + } + + @Override + protected String getAlgorithmName() { + return PageRank.class.getName(); + } + + @Override + protected boolean mergeConfiguration(GraphAlgorithmWrappingDataSet other) { + Preconditions.checkNotNull(other); + + if (! PageRank.class.isAssignableFrom(other.getClass())) { + return false; + } + + PageRank rhs = (PageRank) other; + + // merge configurations + + maxIterations = Math.max(maxIterations, rhs.maxIterations); + convergenceThreshold = Math.min(convergenceThreshold, rhs.convergenceThreshold); + parallelism = (parallelism == PARALLELISM_DEFAULT) ? rhs.parallelism : + ((rhs.parallelism == PARALLELISM_DEFAULT) ? parallelism : Math.min(parallelism, rhs.parallelism)); + + return true; + } + + @Override + public DataSet<Result<K>> runInternal(Graph<K, VV, EV> input) + throws Exception { + // vertex degree + DataSet<Vertex<K, Degrees>> vertexDegree = input + .run(new VertexDegrees<K, VV, EV>() + .setParallelism(parallelism)); + + // vertex count + DataSet<LongValue> vertexCount = GraphUtils.count(vertexDegree); + + // s, t, d(s) + DataSet<Edge<K, LongValue>> edgeSourceDegree = input + .run(new EdgeSourceDegrees<K, VV, EV>() + .setParallelism(parallelism)) + .map(new ExtractSourceDegree<K, EV>()) + .setParallelism(parallelism) + .name("Extract source degree"); + + // vertices with zero in-edges + DataSet<Tuple2<K, DoubleValue>> sourceVertices = vertexDegree + .flatMap(new InitializeSourceVertices<K>()) + .withBroadcastSet(vertexCount, VERTEX_COUNT) + .setParallelism(parallelism) + .name("Initialize source vertex scores"); + + // s, initial pagerank(s) + DataSet<Tuple2<K, DoubleValue>> initialScores = vertexDegree + .map(new InitializeVertexScores<K>()) + .withBroadcastSet(vertexCount, VERTEX_COUNT) + .setParallelism(parallelism) + .name("Initialize scores"); + + IterativeDataSet<Tuple2<K, DoubleValue>> iterative = initialScores + .iterate(maxIterations); + + // s, projected pagerank(s) + DataSet<Tuple2<K, DoubleValue>> vertexScores = iterative + .coGroup(edgeSourceDegree) + .where(0) + .equalTo(0) + .with(new SendScore<K>()) + .setParallelism(parallelism) + .name("Send score") + .groupBy(0) + .reduce(new SumScore<K>()) + .setCombineHint(CombineHint.HASH) + .setParallelism(parallelism) + .name("Sum"); + + // ignored ID, total pagerank + DataSet<Tuple2<K, DoubleValue>> sumOfScores = vertexScores + .reduce(new SumVertexScores<K>()) + .setParallelism(parallelism) + .name("Sum"); + + // s, adjusted pagerank(s) + DataSet<Tuple2<K, DoubleValue>> adjustedScores = vertexScores + .union(sourceVertices) + .setParallelism(parallelism) + .name("Union with source vertices") + .map(new AdjustScores<K>(dampingFactor)) + .withBroadcastSet(sumOfScores, SUM_OF_SCORES) + .withBroadcastSet(vertexCount, VERTEX_COUNT) + .setParallelism(parallelism) + .name("Adjust scores"); + + DataSet<Tuple2<K, DoubleValue>> passThrough; + + if (convergenceThreshold < Double.MAX_VALUE) { + passThrough = iterative + .join(adjustedScores) + .where(0) + .equalTo(0) + .with(new ChangeInScores<K>()) + .setParallelism(parallelism) + .name("Change in scores"); + + iterative.registerAggregationConvergenceCriterion(CHANGE_IN_SCORES, new DoubleSumAggregator(), new ScoreConvergence(convergenceThreshold)); + } else { + passThrough = adjustedScores; + } + + return iterative + .closeWith(passThrough) + .map(new TranslateResult<K>()) + .setParallelism(parallelism) + .name("Map result"); + } + + /** + * Remove the unused original edge value and extract the out-degree. + * + * @param <T> ID type + * @param <ET> edge value type + */ + @ForwardedFields("0; 1") + private static class ExtractSourceDegree<T, ET> + implements MapFunction<Edge<T, Tuple2<ET, Degrees>>, Edge<T, LongValue>> { + Edge<T, LongValue> output = new Edge<>(); + + @Override + public Edge<T, LongValue> map(Edge<T, Tuple2<ET, Degrees>> edge) + throws Exception { + output.f0 = edge.f0; + output.f1 = edge.f1; + output.f2 = edge.f2.f1.getOutDegree(); + return output; + } + } + + /** + * Source vertices have no in-edges so have a projected score of 0.0. + * + * @param <T> ID type + */ + @ForwardedFields("0") + private static class InitializeSourceVertices<T> + implements FlatMapFunction<Vertex<T, Degrees>, Tuple2<T, DoubleValue>> { + private Tuple2<T, DoubleValue> output = new Tuple2<>(null, new DoubleValue(0.0)); + + @Override + public void flatMap(Vertex<T, Degrees> vertex, Collector<Tuple2<T, DoubleValue>> out) + throws Exception { + if (vertex.f1.getInDegree().getValue() == 0) { + output.f0 = vertex.f0; + out.collect(output); + } + } + } + + /** + * PageRank scores sum to 1.0 so initialize each vertex with the inverse of + * the number of vertices. + * + * @param <T> ID type + */ + @ForwardedFields("0") + private static class InitializeVertexScores<T> + extends RichMapFunction<Vertex<T, Degrees>, Tuple2<T, DoubleValue>> { + private Tuple2<T, DoubleValue> output = new Tuple2<>(); + + @Override + public void open(Configuration parameters) + throws Exception { + super.open(parameters); + + Collection<LongValue> vertexCount = getRuntimeContext().getBroadcastVariable(VERTEX_COUNT); + output.f1 = new DoubleValue(1.0 / vertexCount.iterator().next().getValue()); + } + + @Override + public Tuple2<T, DoubleValue> map(Vertex<T, Degrees> vertex) + throws Exception { + output.f0 = vertex.f0; + return output; + } + } + + /** + * The PageRank score for each vertex is divided evenly and projected to + * neighbors on out-edges. + * + * @param <T> ID type + */ + @ForwardedFieldsSecond("1->0") + private static class SendScore<T> + implements CoGroupFunction<Tuple2<T, DoubleValue>, Edge<T, LongValue>, Tuple2<T, DoubleValue>> { + private Tuple2<T, DoubleValue> output = new Tuple2<>(null, new DoubleValue()); + + @Override + public void coGroup(Iterable<Tuple2<T, DoubleValue>> vertex, Iterable<Edge<T, LongValue>> edges, Collector<Tuple2<T, DoubleValue>> out) + throws Exception { + Iterator<Edge<T, LongValue>> edgeIterator = edges.iterator(); + + if (edgeIterator.hasNext()) { + Edge<T, LongValue> edge = edgeIterator.next(); + + output.f0 = edge.f1; + output.f1.setValue(vertex.iterator().next().f1.getValue() / edge.f2.getValue()); + out.collect(output); + + while (edgeIterator.hasNext()) { + edge = edgeIterator.next(); + output.f0 = edge.f1; + out.collect(output); + } + } + } + } + + /** + * Sum the PageRank score over all vertices. The vertex ID must be ignored + * but is retained rather than adding another operator. + * + * @param <T> ID type + */ + @ForwardedFields("0") + private static class SumVertexScores<T> + implements ReduceFunction<Tuple2<T, DoubleValue>> { + @Override + public Tuple2<T, DoubleValue> reduce(Tuple2<T, DoubleValue> first, Tuple2<T, DoubleValue> second) + throws Exception { + first.f1.setValue(first.f1.getValue() + second.f1.getValue()); + return first; + } + } + + /** + * Each iteration the per-vertex scores are adjusted with the damping + * factor. Each score is multiplied by the damping factor then added to the + * probability of a "random hop", which is one minus the damping factor. + * + * This operation also accounts for 'sink' vertices, which have no + * out-edges to project score to. The sink scores are computed by taking + * one minus the sum of vertex scores, which also includes precision error. + * This 'missing' score is evenly distributed across vertices as with the + * random hop. + * + * @param <T> ID type + */ + @ForwardedFields("0") + private static class AdjustScores<T> + extends RichMapFunction<Tuple2<T, DoubleValue>, Tuple2<T, DoubleValue>> { + private double dampingFactor; + + private long vertexCount; + + private double uniformlyDistributedScore; + + public AdjustScores(double dampingFactor) { + this.dampingFactor = dampingFactor; + } + + @Override + public void open(Configuration parameters) throws Exception { + super.open(parameters); + + Collection<Tuple2<T, DoubleValue>> sumOfScores = getRuntimeContext().getBroadcastVariable(SUM_OF_SCORES); + // floating point precision error is also included in sumOfSinks + double sumOfSinks = 1 - sumOfScores.iterator().next().f1.getValue(); + + Collection<LongValue> vertexCount = getRuntimeContext().getBroadcastVariable(VERTEX_COUNT); + this.vertexCount = vertexCount.iterator().next().getValue(); + + this.uniformlyDistributedScore = ((1 - dampingFactor) + dampingFactor * sumOfSinks) / this.vertexCount; + } + + @Override + public Tuple2<T, DoubleValue> map(Tuple2<T, DoubleValue> value) throws Exception { + value.f1.setValue(uniformlyDistributedScore + (dampingFactor * value.f1.getValue())); + return value; + } + } + + /** + * Computes the sum of the absolute change in vertex PageRank scores + * between iterations. + * + * @param <T> ID type + */ + @ForwardedFieldsFirst("0") + @ForwardedFieldsSecond("*") + private static class ChangeInScores<T> + extends RichJoinFunction<Tuple2<T, DoubleValue>, Tuple2<T, DoubleValue>, Tuple2<T, DoubleValue>> { + private double changeInScores; + + @Override + public void open(Configuration parameters) throws Exception { + super.open(parameters); + + changeInScores = 0.0; + } + + @Override + public void close() + throws Exception { + super.close(); + + DoubleSumAggregator agg = getIterationRuntimeContext().getIterationAggregator(CHANGE_IN_SCORES); + agg.aggregate(changeInScores); + } + + @Override + public Tuple2<T, DoubleValue> join(Tuple2<T, DoubleValue> first, Tuple2<T, DoubleValue> second) + throws Exception { + changeInScores += Math.abs(second.f1.getValue() - first.f1.getValue()); + return second; + } + } + + /** + * Monitors the sum of the absolute change in vertex scores. The algorithm + * terminates when the change in scores compared against the prior iteration + * falls to or below the given convergence threshold. + */ + private static class ScoreConvergence + implements ConvergenceCriterion<DoubleValue> { + private double convergenceThreshold; + + public ScoreConvergence(double convergenceThreshold) { + this.convergenceThreshold = convergenceThreshold; + } + + @Override + public boolean isConverged(int iteration, DoubleValue value) { + double val = value.getValue(); + return (val <= convergenceThreshold); + } + } + + /** + * Map the Tuple result to the return type. + * + * @param <T> ID type + */ + @ForwardedFields("0; 1") + private static class TranslateResult<T> + implements MapFunction<Tuple2<T, DoubleValue>, Result<T>> { + private Result<T> output = new Result<>(); + + @Override + public Result<T> map(Tuple2<T, DoubleValue> value) throws Exception { + output.f0 = value.f0; + output.f1 = value.f1; + return output; + } + } + + /** + * Wraps the {@link Tuple2} to encapsulate results from the PageRank algorithm. + * + * @param <T> ID type + */ + public static class Result<T> + extends Tuple2<T, DoubleValue> { + public static final int HASH_SEED = 0x4010af29; + + private Murmur3_32 hasher = new Murmur3_32(HASH_SEED); + + public T getVertexId0() { + return f0; + } + + /** + * Get the PageRank score. + * + * @return the PageRank score + */ + public DoubleValue getPageRankScore() { + return f1; + } + + public String toVerboseString() { + return "Vertex ID: " + getVertexId0() + + ", PageRank score: " + getPageRankScore(); + } + + @Override + public int hashCode() { + return hasher.reset() + .hash(f0.hashCode()) + .hash(f1.getValue()) + .hash(); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/ea14053f/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/link_analysis/HITSTest.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/link_analysis/HITSTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/link_analysis/HITSTest.java index b09f95c..e9db838 100644 --- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/link_analysis/HITSTest.java +++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/link_analysis/HITSTest.java @@ -21,8 +21,7 @@ package org.apache.flink.graph.library.link_analysis; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.graph.asm.AsmTestBase; -import org.apache.flink.graph.asm.dataset.ChecksumHashCode; -import org.apache.flink.graph.asm.dataset.ChecksumHashCode.Checksum; +import org.apache.flink.graph.asm.dataset.Collect; import org.apache.flink.graph.library.link_analysis.HITS.Result; import org.apache.flink.types.IntValue; import org.apache.flink.types.LongValue; @@ -30,26 +29,43 @@ import org.apache.flink.types.NullValue; import org.junit.Test; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; import static org.junit.Assert.assertEquals; public class HITSTest extends AsmTestBase { + /* + * This test result can be verified with the following Python script. + + import math + import networkx as nx + + graph=nx.read_edgelist('directedSimpleGraph.csv', delimiter=',', create_using=nx.DiGraph()) + hits=nx.algorithms.link_analysis.hits(graph) + + hubbiness_norm=math.sqrt(sum(v*v for v in hits[0].values())) + authority_norm=math.sqrt(sum(v*v for v in hits[1].values())) + + for key in sorted(hits[0]): + print('{}: {}, {}'.format(key, hits[0][key]/hubbiness_norm, hits[1][key]/authority_norm)) + */ @Test public void testWithSimpleGraph() throws Exception { - DataSet<Result<IntValue>> hits = new HITS<IntValue, NullValue, NullValue>(10) + DataSet<Result<IntValue>> hits = new HITS<IntValue, NullValue, NullValue>(20) .run(directedSimpleGraph); List<Tuple2<Double, Double>> expectedResults = new ArrayList<>(); - expectedResults.add(Tuple2.of(0.5446287864731747, 0.0)); - expectedResults.add(Tuple2.of(0.0, 0.8363240238999012)); - expectedResults.add(Tuple2.of(0.6072453524686667, 0.26848532437604833)); - expectedResults.add(Tuple2.of(0.5446287864731747, 0.39546603929699625)); - expectedResults.add(Tuple2.of(0.0, 0.26848532437604833)); - expectedResults.add(Tuple2.of(0.194966796646811, 0.0)); + expectedResults.add(Tuple2.of(0.544643396306, 0.0)); + expectedResults.add(Tuple2.of(0.0, 0.836329395866)); + expectedResults.add(Tuple2.of(0.607227031134, 0.268492526138)); + expectedResults.add(Tuple2.of(0.544643396306, 0.395444899355)); + expectedResults.add(Tuple2.of(0.0, 0.268492526138)); + expectedResults.add(Tuple2.of(0.194942233447, 0.0)); for (Result<IntValue> result : hits.collect()) { int id = result.f0.getValue(); @@ -76,17 +92,53 @@ extends AsmTestBase { } } + /* + * This test result can be verified with the following Python script. + + import math + import networkx as nx + + graph=nx.read_edgelist('directedRMatGraph.csv', delimiter=',', create_using=nx.DiGraph()) + hits=nx.algorithms.link_analysis.hits(graph) + + hubbiness_norm=math.sqrt(sum(v*v for v in hits[0].values())) + authority_norm=math.sqrt(sum(v*v for v in hits[1].values())) + + for key in [0, 1, 2, 8, 13, 29, 109, 394, 652, 1020]: + print('{}: {}, {}'.format(key, hits[0][str(key)]/hubbiness_norm, hits[1][str(key)]/authority_norm)) + */ @Test public void testWithRMatGraph() throws Exception { DataSet<Result<LongValue>> hits = directedRMatGraph - .run(new HITS<LongValue, NullValue, NullValue>(1)); + .run(new HITS<LongValue, NullValue, NullValue>(0.000001)); - Checksum checksum = new ChecksumHashCode<Result<LongValue>>() - .run(hits) - .execute(); + Map<Long, Result<LongValue>> results = new HashMap<>(); + for (Result<LongValue> result : new Collect<Result<LongValue>>().run(hits).execute()) { + results.put(result.f0.getValue(), result); + } - assertEquals(902, checksum.getCount()); - assertEquals(0x000001cbba6dbcd0L, checksum.getChecksum()); + assertEquals(902, results.size()); + + Map<Long, Tuple2<Double, Double>> expectedResults = new HashMap<>(); + // a pseudo-random selection of results, both high and low + expectedResults.put(0L, Tuple2.of(0.231077034747, 0.238110214937)); + expectedResults.put(1L, Tuple2.of(0.162364053933, 0.169679504287)); + expectedResults.put(2L, Tuple2.of(0.162412612499, 0.161015667261)); + expectedResults.put(8L, Tuple2.of(0.167064641724, 0.158592966505)); + expectedResults.put(13L, Tuple2.of(0.041915595624, 0.0407091625629)); + expectedResults.put(29L, Tuple2.of(0.0102017346511, 0.0146218045999)); + expectedResults.put(109L, Tuple2.of(0.00190531000389, 0.00481944993023)); + expectedResults.put(394L, Tuple2.of(0.0122287016161, 0.0147987969538)); + expectedResults.put(652L, Tuple2.of(0.010966659242, 0.0113713306749)); + expectedResults.put(1020L, Tuple2.of(0.0, 0.000326973732127)); + + for (Map.Entry<Long, Tuple2<Double, Double>> expected : expectedResults.entrySet()) { + double hubScore = results.get(expected.getKey()).getHubScore().getValue(); + double authorityScore = results.get(expected.getKey()).getAuthorityScore().getValue(); + + assertEquals(expected.getValue().f0, hubScore, 0.00001); + assertEquals(expected.getValue().f1, authorityScore, 0.00001); + } } } http://git-wip-us.apache.org/repos/asf/flink/blob/ea14053f/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/link_analysis/PageRankTest.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/link_analysis/PageRankTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/link_analysis/PageRankTest.java new file mode 100644 index 0000000..082b6ad --- /dev/null +++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/link_analysis/PageRankTest.java @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.library.link_analysis; + +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.graph.asm.AsmTestBase; +import org.apache.flink.graph.asm.dataset.Collect; +import org.apache.flink.graph.library.link_analysis.PageRank.Result; +import org.apache.flink.types.DoubleValue; +import org.apache.flink.types.IntValue; +import org.apache.flink.types.LongValue; +import org.apache.flink.types.NullValue; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.junit.Assert.assertEquals; + +public class PageRankTest +extends AsmTestBase { + + private static final double DAMPING_FACTOR = 0.85; + + /* + * This test result can be verified with the following Python script. + + import networkx as nx + + graph=nx.read_edgelist('directedSimpleGraph.csv', delimiter=',', create_using=nx.DiGraph()) + pagerank=nx.algorithms.link_analysis.pagerank(graph) + + for key in sorted(pagerank): + print('{}: {}'.format(key, pagerank[key])) + */ + @Test + public void testWithSimpleGraph() + throws Exception { + DataSet<Result<IntValue>> pr = new PageRank<IntValue, NullValue, NullValue>(DAMPING_FACTOR, 10) + .run(directedSimpleGraph); + + List<Double> expectedResults = new ArrayList<>(); + expectedResults.add(0.09091296131286301); + expectedResults.add(0.27951855944178117); + expectedResults.add(0.12956847924535586); + expectedResults.add(0.22329643739217675); + expectedResults.add(0.18579060129496028); + expectedResults.add(0.09091296131286301); + + for (Tuple2<IntValue, DoubleValue> result : pr.collect()) { + int id = result.f0.getValue(); + assertEquals(expectedResults.get(id), result.f1.getValue(), 0.000001); + } + } + + @Test + public void testWithCompleteGraph() + throws Exception { + double expectedScore = 1.0 / completeGraphVertexCount; + + DataSet<Result<LongValue>> pr = new PageRank<LongValue, NullValue, NullValue>(DAMPING_FACTOR, 0.000001) + .run(completeGraph); + + List<Result<LongValue>> results = pr.collect(); + + assertEquals(completeGraphVertexCount, results.size()); + + for (Tuple2<LongValue, DoubleValue> result : results) { + assertEquals(expectedScore, result.f1.getValue(), 0.000001); + } + } + + /* + * This test result can be verified with the following Python script. + + import networkx as nx + + graph=nx.read_edgelist('directedRMatGraph.csv', delimiter=',', create_using=nx.DiGraph()) + pagerank=nx.algorithms.link_analysis.pagerank(graph) + + for key in [0, 1, 2, 8, 13, 29, 109, 394, 652, 1020]: + print('{}: {}'.format(key, pagerank[str(key)])) + */ + @Test + public void testWithRMatGraph() + throws Exception { + DataSet<Result<LongValue>> pr = new PageRank<LongValue, NullValue, NullValue>(DAMPING_FACTOR, 0.000001) + .run(directedRMatGraph); + + Map<Long, Result<LongValue>> results = new HashMap<>(); + for (Result<LongValue> result : new Collect<Result<LongValue>>().run(pr).execute()) { + results.put(result.getVertexId0().getValue(), result); + } + + assertEquals(902, results.size()); + + Map<Long, Double> expectedResults = new HashMap<>(); + // a pseudo-random selection of results, both high and low + expectedResults.put(0L, 0.027111807822); + expectedResults.put(1L, 0.0132842310382); + expectedResults.put(2L, 0.0121818392504); + expectedResults.put(8L, 0.0115916809743); + expectedResults.put(13L, 0.00183249490033); + expectedResults.put(29L, 0.000848095047082); + expectedResults.put(109L, 0.000308507844048); + expectedResults.put(394L, 0.000828743280246); + expectedResults.put(652L, 0.000684102931253); + expectedResults.put(1020L, 0.000250487135148); + + for (Map.Entry<Long, Double> expected : expectedResults.entrySet()) { + double value = results.get(expected.getKey()).getPageRankScore().getValue(); + + assertEquals(expected.getValue(), value, 0.00001); + } + } +}
