Repository: flink Updated Branches: refs/heads/master 7160a6812 -> 8da1a75ce
[FLINK-3906] [gelly] Global Clustering Coefficient The global clustering coefficient measures the connectedness of a graph. Scores range from 0.0 (no triangles) to 1.0 (complete graph). This closes #1997 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8da1a75c Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8da1a75c Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8da1a75c Branch: refs/heads/master Commit: 8da1a75ceb30ef1bce27a2426dab3a0f66b94b64 Parents: 7160a68 Author: Greg Hogan <c...@greghogan.com> Authored: Tue May 17 10:02:47 2016 -0400 Committer: Greg Hogan <c...@greghogan.com> Committed: Tue Jun 7 12:26:15 2016 -0400 ---------------------------------------------------------------------- docs/apis/batch/libs/gelly.md | 38 ++- .../graph/library/TriangleCountITCase.java | 53 ---- .../org/apache/flink/graph/scala/Graph.scala | 18 +- .../flink/graph/AbstractGraphAnalytic.java | 62 +++++ .../main/java/org/apache/flink/graph/Graph.java | 32 ++- .../org/apache/flink/graph/GraphAnalytic.java | 74 ++++++ .../flink/graph/library/GSATriangleCount.java | 192 -------------- .../undirected/GlobalClusteringCoefficient.java | 155 ++++++++++++ .../undirected/LocalClusteringCoefficient.java | 2 +- .../clustering/undirected/TriangleCount.java | 78 ++++++ .../clustering/undirected/TriangleListing.java | 4 +- .../metric/undirected/VertexMetrics.java | 251 +++++++++++++++++++ .../GlobalClusteringCoefficientTest.java | 84 +++++++ .../LocalClusteringCoefficientTest.java | 8 +- .../undirected/TriangleCountTest.java | 75 ++++++ .../metric/undirected/VertexMetricsTest.java | 97 +++++++ 16 files changed, 954 insertions(+), 269 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/8da1a75c/docs/apis/batch/libs/gelly.md ---------------------------------------------------------------------- diff --git a/docs/apis/batch/libs/gelly.md b/docs/apis/batch/libs/gelly.md index 7adff04..45fdbe5 100644 --- a/docs/apis/batch/libs/gelly.md +++ b/docs/apis/batch/libs/gelly.md @@ -1828,12 +1828,13 @@ Gelly has a growing collection of graph algorithms for easily analyzing large-sc * [GSA PageRank](#gsa-pagerank) * [Single Source Shortest Paths](#single-source-shortest-paths) * [GSA Single Source Shortest Paths](#gsa-single-source-shortest-paths) -* [GSA Triangle Count](#gsa-triangle-count) +* [Triangle Count](#triangle-count) * [Triangle Enumerator](#triangle-enumerator) * [Hyperlink-Induced Topic Search](#hyperlink-induced-topic-search) * [Summarization](#summarization) * [Jaccard Index](#jaccard-index) * [Local Clustering Coefficient](#local-clustering-coefficient) +* [Global Clustering Coefficient](#global-clustering-coefficient) Gelly's library methods can be used by simply calling the `run()` method on the input graph: @@ -1997,19 +1998,23 @@ The algorithm is implemented using [gather-sum-apply iterations](#gather-sum-app See the [Single Source Shortest Paths](#single-source-shortest-paths) library method for implementation details and usage information. -### GSA Triangle Count +### Triangle Count #### Overview -An implementation of the Triangle Count algorithm. Given an input graph, it returns the number of unique triangles in it. +An analytic for counting the number of unique triangles in a graph. #### Details -This algorithm operates in three phases. First, vertices select neighbors with IDs greater than theirs -and send messages to them. Each received message is then propagated to neighbors with higher IDs. -Finally, if a node encounters the target ID in the list of received messages, it increments the number of discovered triangles. +Counts the triangles generated by [Triangle Listing](#triangle-listing). #### Usage -The algorithm takes an undirected, unweighted graph as input and outputs a `DataSet` which contains a single integer corresponding to the number of triangles -in the graph. The algorithm constructor takes no arguments. +The analytic takes an undirected graph as input and returns as a result a `Long` corresponding to the number of triangles +in the graph. The graph ID type must be `Comparable` and `Copyable`. + +### Triangle Listing + +This algorithm supports object reuse. The graph ID type must be `Comparable` and `Copyable`. + +See the [Triangle Enumerator](#triangle-enumerator) library method for implementation details. ### Triangle Enumerator @@ -2108,7 +2113,22 @@ See the [Triangle Enumeration](#triangle-enumeration) library method for a detai #### Usage The algorithm takes a simple, undirected graph as input and outputs a `DataSet` of tuples containing the vertex ID, -vertex degree, and number of triangles containing the vertex. The vertex ID must be `Comparable` and `Copyable`. +vertex degree, and number of triangles containing the vertex. The graph ID type must be `Comparable` and `Copyable`. + +### Global Clustering Coefficient + +#### Overview +The global clustering coefficient measures the connectedness of a graph. Scores range from 0.0 (no edges between +neighbors) to 1.0 (complete graph). + +#### Details +See the [Local Clustering Coefficient](#local-clustering-coefficient) library method for a detailed explanation of +clustering coefficient. + +#### Usage +The algorithm takes a simple, undirected graph as input and outputs a result containing the total number of triplets and +triangles in the graph. The graph ID type must be `Comparable` and `Copyable`. + {% top %} http://git-wip-us.apache.org/repos/asf/flink/blob/8da1a75c/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/library/TriangleCountITCase.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/library/TriangleCountITCase.java b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/library/TriangleCountITCase.java deleted file mode 100644 index aaada8f..0000000 --- a/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/library/TriangleCountITCase.java +++ /dev/null @@ -1,53 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.graph.library; - -import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.graph.Graph; -import org.apache.flink.graph.examples.data.TriangleCountData; -import org.apache.flink.test.util.MultipleProgramsTestBase; -import org.apache.flink.types.NullValue; -import org.junit.Assert; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; - -import java.util.List; - -@RunWith(Parameterized.class) -public class TriangleCountITCase extends MultipleProgramsTestBase { - - public TriangleCountITCase(TestExecutionMode mode) { - super(mode); - } - - @Test - public void testGSATriangleCount() throws Exception { - - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - Graph<Long, NullValue, NullValue> graph = Graph.fromDataSet(TriangleCountData.getDefaultEdgeDataSet(env), - env).getUndirected(); - - List<Integer> numberOfTriangles = graph.run(new GSATriangleCount<Long, NullValue, NullValue>()).collect(); - String expectedResult = TriangleCountData.RESULTED_NUMBER_OF_TRIANGLES; - - Assert.assertEquals(numberOfTriangles.get(0).intValue(), Integer.parseInt(expectedResult)); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/8da1a75c/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/Graph.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/Graph.scala b/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/Graph.scala index f31619d..3881aae 100644 --- a/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/Graph.scala +++ b/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/Graph.scala @@ -1103,19 +1103,35 @@ TypeInformation : ClassTag](jgraph: jg.Graph[K, VV, EV]) { * @return a Dataset of Tuple2, with one tuple per vertex. * The first field of the Tuple2 is the vertex ID and the second field * is the aggregate value computed by the provided [[ReduceNeighborsFunction]]. - */ + */ def reduceOnEdges(reduceEdgesFunction: ReduceEdgesFunction[EV], direction: EdgeDirection): DataSet[(K, EV)] = { wrap(jgraph.reduceOnEdges(reduceEdgesFunction, direction)).map(jtuple => (jtuple.f0, jtuple.f1)) } + /** + * @param algorithm the algorithm to run on the Graph + * @return the result of the graph algorithm + */ def run[T: TypeInformation : ClassTag](algorithm: GraphAlgorithm[K, VV, EV, T]): T = { jgraph.run(algorithm) } /** + * A GraphAnalytic is similar to a GraphAlgorithm but is terminal and results + * are retrieved via accumulators. A Flink program has a single point of + * execution. A GraphAnalytic defers execution to the user to allow composing + * multiple analytics and algorithms into a single program. + * + * @param analytic the analytic to run on the Graph + */ + def run[T: TypeInformation : ClassTag](analytic: GraphAnalytic[K, VV, EV, T])= { + jgraph.run(analytic) + } + + /** * Runs a scatter-gather iteration on the graph. * No configuration options are provided. * http://git-wip-us.apache.org/repos/asf/flink/blob/8da1a75c/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/AbstractGraphAnalytic.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/AbstractGraphAnalytic.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/AbstractGraphAnalytic.java new file mode 100644 index 0000000..b13e82e --- /dev/null +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/AbstractGraphAnalytic.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph; + +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.util.Preconditions; + +/** + * Base class for {@link GraphAnalytic}. + * + * @param <K> key type + * @param <VV> vertex value type + * @param <EV> edge value type + * @param <T> the return type + */ +public abstract class AbstractGraphAnalytic<K, VV, EV, T> +implements GraphAnalytic<K, VV, EV, T> { + + protected ExecutionEnvironment env; + + @Override + public GraphAnalytic<K, VV, EV, T> run(Graph<K, VV, EV> input) + throws Exception { + env = input.getContext(); + return null; + } + + @Override + public T execute() + throws Exception { + Preconditions.checkNotNull(env); + + env.execute(); + return getResult(); + } + + @Override + public T execute(String jobName) + throws Exception { + Preconditions.checkNotNull(jobName); + Preconditions.checkNotNull(env); + + env.execute(jobName); + return getResult(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/8da1a75c/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java index b17f7a5..dd25cfd 100755 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java @@ -18,20 +18,13 @@ package org.apache.flink.graph; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Iterator; -import java.util.NoSuchElementException; -import java.util.List; -import java.util.Arrays; - import org.apache.flink.api.common.functions.CoGroupFunction; import org.apache.flink.api.common.functions.FilterFunction; import org.apache.flink.api.common.functions.FlatJoinFunction; -import org.apache.flink.api.common.functions.JoinFunction; -import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.functions.GroupReduceFunction; +import org.apache.flink.api.common.functions.JoinFunction; +import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.DataSet; @@ -71,6 +64,13 @@ import org.apache.flink.graph.validation.GraphValidator; import org.apache.flink.types.NullValue; import org.apache.flink.util.Collector; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; + /** * Represents a Graph consisting of {@link Edge edges} and {@link Vertex * vertices}. @@ -1788,6 +1788,20 @@ public class Graph<K, VV, EV> { } /** + * A {@code GraphAnalytic} is similar to a {@link GraphAlgorithm} but is terminal + * and results are retrieved via accumulators. A Flink program has a single + * point of execution. A {@code GraphAnalytic} defers execution to the user to + * allow composing multiple analytics and algorithms into a single program. + * + * @param analytic the analytic to run on the Graph + * @param <T> the result type + * @throws Exception + */ + public <T> void run(GraphAnalytic<K, VV, EV, T> analytic) throws Exception { + analytic.run(this); + } + + /** * Groups by vertex and computes a GroupReduce transformation over the neighbors (both edges and vertices) * of each vertex. The neighborsFunction applied on the neighbors only has access to both the vertex id * and the vertex value of the grouping vertex. http://git-wip-us.apache.org/repos/asf/flink/blob/8da1a75c/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/GraphAnalytic.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/GraphAnalytic.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/GraphAnalytic.java new file mode 100644 index 0000000..dd221dc --- /dev/null +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/GraphAnalytic.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph; + +import org.apache.flink.api.common.io.OutputFormat; +import org.apache.flink.api.java.DataSet; + +/** + * A {@code GraphAnalytic} is similar to a {@link GraphAlgorithm} but is terminal + * and results are retrieved via accumulators. A Flink program has a single + * point of execution. A {@code GraphAnalytic} defers execution to the user to + * allow composing multiple analytics and algorithms into a single program. + * + * @param <K> key type + * @param <VV> vertex value type + * @param <EV> edge value type + * @param <T> the return type + */ +public interface GraphAnalytic<K, VV, EV, T> { + + /** + * This method must be called after the program has executed: + * 1) "run" analytics and algorithms + * 2) call ExecutionEnvironment.execute() + * 3) get analytics results + * + * @return the result + */ + T getResult(); + + /** + * Execute the program and return the result. + * + * @return the result + * @throws Exception + */ + T execute() throws Exception; + + /** + * Execute the program and return the result. + * + * @param jobName the name to assign to the job + * @return the result + * @throws Exception + */ + T execute(String jobName) throws Exception; + + /** + * All {@code GraphAnalytic} processing must be terminated by an + * {@link OutputFormat}. Rather than obtained via accumulators rather than + * returned by a {@link DataSet}. + * + * @param input input graph + * @return this + * @throws Exception + */ + GraphAnalytic<K, VV, EV, T> run(Graph<K, VV, EV> input) throws Exception; +} http://git-wip-us.apache.org/repos/asf/flink/blob/8da1a75c/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/GSATriangleCount.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/GSATriangleCount.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/GSATriangleCount.java deleted file mode 100644 index 1eafce2..0000000 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/GSATriangleCount.java +++ /dev/null @@ -1,192 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.graph.library; - - -import org.apache.flink.api.common.functions.MapFunction; -import org.apache.flink.api.common.functions.ReduceFunction; -import org.apache.flink.api.java.DataSet; -import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.graph.GraphAlgorithm; -import org.apache.flink.graph.ReduceNeighborsFunction; -import org.apache.flink.graph.Vertex; -import org.apache.flink.graph.Edge; -import org.apache.flink.graph.Triplet; -import org.apache.flink.graph.EdgeDirection; -import org.apache.flink.graph.Graph; -import org.apache.flink.graph.VertexJoinFunction; -import org.apache.flink.types.NullValue; - -import java.util.TreeMap; - -/** - * Triangle Count Algorithm. - * - * This algorithm operates in three phases. First, vertices select neighbors with id greater than theirs - * and send messages to them. Each received message is then propagated to neighbors with higher id. - * Finally, if a node encounters the target id in the list of received messages, it increments the number - * of triangles found. - * - * This implementation is non - iterative. - * - * The algorithm takes an undirected, unweighted graph as input and outputs a DataSet - * which contains a single integer representing the number of triangles. - */ -public class GSATriangleCount<K extends Comparable<K>, VV, EV> implements - GraphAlgorithm<K, VV, EV, DataSet<Integer>> { - - @SuppressWarnings("serial") - @Override - public DataSet<Integer> run(Graph<K, VV, EV> input) throws Exception { - - ExecutionEnvironment env = input.getContext(); - - // order the edges so that src is always higher than trg - DataSet<Edge<K, NullValue>> edges = input.getEdges().map(new OrderEdges<K, EV>()).distinct(); - - Graph<K, TreeMap<K, Integer>, NullValue> graph = Graph.fromDataSet(edges, - new VertexInitializer<K>(), env); - - // select neighbors with ids higher than the current vertex id - // Gather: a no-op in this case - // Sum: create the set of neighbors - DataSet<Tuple2<K, TreeMap<K, Integer>>> higherIdNeighbors = - graph.reduceOnNeighbors(new GatherHigherIdNeighbors<K>(), EdgeDirection.IN); - - Graph<K, TreeMap<K, Integer>, NullValue> graphWithReinitializedVertexValues = - graph.mapVertices(new VertexInitializerEmptyTreeMap<K>()); - - // Apply: attach the computed values to the vertices - // joinWithVertices to update the node values - DataSet<Vertex<K, TreeMap<K, Integer>>> verticesWithHigherIdNeighbors = - graphWithReinitializedVertexValues.joinWithVertices(higherIdNeighbors, new AttachValues<K>()).getVertices(); - - Graph<K, TreeMap<K,Integer>, NullValue> graphWithNeighbors = Graph.fromDataSet(verticesWithHigherIdNeighbors, - edges, env); - - // propagate each received value to neighbors with higher id - // Gather: a no-op in this case - // Sum: propagate values - DataSet<Tuple2<K, TreeMap<K, Integer>>> propagatedValues = graphWithNeighbors - .reduceOnNeighbors(new GatherHigherIdNeighbors<K>(), EdgeDirection.IN); - - // Apply: attach propagated values to vertices - DataSet<Vertex<K, TreeMap<K, Integer>>> verticesWithPropagatedValues = - graphWithReinitializedVertexValues.joinWithVertices(propagatedValues, new AttachValues<K>()).getVertices(); - - Graph<K, TreeMap<K, Integer>, NullValue> graphWithPropagatedNeighbors = - Graph.fromDataSet(verticesWithPropagatedValues, graphWithNeighbors.getEdges(), env); - - // Scatter: compute the number of triangles - DataSet<Integer> numberOfTriangles = graphWithPropagatedNeighbors.getTriplets() - .map(new ComputeTriangles<K>()).reduce(new ReduceFunction<Integer>() { - - @Override - public Integer reduce(Integer first, Integer second) throws Exception { - return first + second; - } - }); - - return numberOfTriangles; - } - - @SuppressWarnings("serial") - private static final class OrderEdges<K extends Comparable<K>, EV> implements - MapFunction<Edge<K, EV>, Edge<K, NullValue>> { - - @Override - public Edge<K, NullValue> map(Edge<K, EV> edge) throws Exception { - if (edge.getSource().compareTo(edge.getTarget()) < 0) { - return new Edge<K, NullValue>(edge.getTarget(), edge.getSource(), NullValue.getInstance()); - } else { - return new Edge<K, NullValue>(edge.getSource(), edge.getTarget(), NullValue.getInstance()); - } - } - } - - @SuppressWarnings("serial") - private static final class VertexInitializer<K> implements MapFunction<K, TreeMap<K, Integer>> { - - @Override - public TreeMap<K, Integer> map(K value) throws Exception { - TreeMap<K, Integer> neighbors = new TreeMap<K, Integer>(); - neighbors.put(value, 1); - - return neighbors; - } - } - - @SuppressWarnings("serial") - private static final class VertexInitializerEmptyTreeMap<K> implements - MapFunction<Vertex<K, TreeMap<K, Integer>>, TreeMap<K, Integer>> { - - @Override - public TreeMap<K, Integer> map(Vertex<K, TreeMap<K, Integer>> vertex) throws Exception { - return new TreeMap<K, Integer>(); - } - } - - @SuppressWarnings("serial") - private static final class AttachValues<K> implements VertexJoinFunction<TreeMap<K, Integer>, - TreeMap<K, Integer>> { - - @Override - public TreeMap<K, Integer> vertexJoin(TreeMap<K, Integer> vertexValue, - TreeMap<K, Integer> inputValue) { - return inputValue; - } - } - - @SuppressWarnings("serial") - private static final class GatherHigherIdNeighbors<K> implements - ReduceNeighborsFunction<TreeMap<K,Integer>> { - - @Override - public TreeMap<K, Integer> reduceNeighbors(TreeMap<K,Integer> first, TreeMap<K,Integer> second) { - for (K key : second.keySet()) { - Integer value = first.get(key); - if (value != null) { - first.put(key, value + second.get(key)); - } else { - first.put(key, second.get(key)); - } - } - return first; - } - } - - @SuppressWarnings("serial") - private static final class ComputeTriangles<K> implements MapFunction<Triplet<K, TreeMap<K, Integer>, NullValue>, - Integer> { - - @Override - public Integer map(Triplet<K, TreeMap<K, Integer>, NullValue> triplet) throws Exception { - - Vertex<K, TreeMap<K, Integer>> srcVertex = triplet.getSrcVertex(); - Vertex<K, TreeMap<K, Integer>> trgVertex = triplet.getTrgVertex(); - int triangles = 0; - - if(trgVertex.getValue().get(srcVertex.getId()) != null) { - triangles = trgVertex.getValue().get(srcVertex.getId()); - } - return triangles; - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/8da1a75c/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/GlobalClusteringCoefficient.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/GlobalClusteringCoefficient.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/GlobalClusteringCoefficient.java new file mode 100644 index 0000000..fc89e43 --- /dev/null +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/GlobalClusteringCoefficient.java @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.library.clustering.undirected; + +import org.apache.commons.lang3.builder.EqualsBuilder; +import org.apache.commons.lang3.builder.HashCodeBuilder; +import org.apache.flink.graph.AbstractGraphAnalytic; +import org.apache.flink.graph.Graph; +import org.apache.flink.graph.library.clustering.undirected.GlobalClusteringCoefficient.Result; +import org.apache.flink.graph.library.metric.undirected.VertexMetrics; +import org.apache.flink.types.CopyableValue; + +import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT; + +/** + * The global clustering coefficient measures the connectedness of a graph. + * Scores range from 0.0 (no triangles) to 1.0 (complete graph). + * + * @param <K> graph ID type + * @param <VV> vertex value type + * @param <EV> edge value type + */ +public class GlobalClusteringCoefficient<K extends Comparable<K> & CopyableValue<K>, VV, EV> +extends AbstractGraphAnalytic<K, VV, EV, Result> { + + private TriangleCount<K, VV, EV> triangleCount; + + private VertexMetrics<K, VV, EV> vertexMetrics; + + // Optional configuration + private int littleParallelism = PARALLELISM_DEFAULT; + + /** + * Override the parallelism of operators processing small amounts of data. + * + * @param littleParallelism operator parallelism + * @return this + */ + public GlobalClusteringCoefficient<K, VV, EV> setLittleParallelism(int littleParallelism) { + this.littleParallelism = littleParallelism; + + return this; + } + + @Override + public GlobalClusteringCoefficient<K, VV, EV> run(Graph<K, VV, EV> input) + throws Exception { + super.run(input); + + triangleCount = new TriangleCount<K, VV, EV>() + .setLittleParallelism(littleParallelism); + + input.run(triangleCount); + + vertexMetrics = new VertexMetrics<K, VV, EV>() + .setParallelism(littleParallelism); + + input.run(vertexMetrics); + + return this; + } + + @Override + public Result getResult() { + return new Result(vertexMetrics.getResult().getNumberOfTriplets(), 3 * triangleCount.getResult()); + } + + /** + * Wraps global clustering coefficient metrics. + */ + public static class Result { + private long tripletCount; + private long triangleCount; + + public Result(long tripletCount, long triangleCount) { + this.tripletCount = tripletCount; + this.triangleCount = triangleCount; + } + + /** + * Get the number of triplets. + * + * @return number of triplets + */ + public long getNumberOfTriplets() { + return tripletCount; + } + + /** + * Get the number of triangles. + * + * @return number of triangles + */ + public long getNumberOfTriangles() { + return triangleCount; + } + + /** + * Get the global clustering coefficient score. This is computed as the + * number of closed triplets (triangles) divided by the total number of + * triplets. + * + * A score of {@code Double.NaN} is returned for a graph of isolated vertices + * for which both the triangle count and number of neighbors are zero. + * + * @return global clustering coefficient score + */ + public double getLocalClusteringCoefficientScore() { + return (tripletCount == 0) ? Double.NaN : triangleCount / (double)tripletCount; + } + + @Override + public String toString() { + return "triplet count: " + tripletCount + ", triangle count:" + triangleCount; + } + + @Override + public int hashCode() { + return new HashCodeBuilder() + .append(tripletCount) + .append(triangleCount) + .hashCode(); + } + + @Override + public boolean equals(Object obj) { + if (obj == null) { return false; } + if (obj == this) { return true; } + if (obj.getClass() != getClass()) { return false; } + + Result rhs = (Result)obj; + + return new EqualsBuilder() + .append(tripletCount, rhs.tripletCount) + .append(triangleCount, rhs.triangleCount) + .isEquals(); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/8da1a75c/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficient.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficient.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficient.java index d1618d1..bc62d36 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficient.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficient.java @@ -185,7 +185,7 @@ implements GraphAlgorithm<K, VV, EV, DataSet<Result<K>>> { } /** - * Wraps the vertex type to encapsulate results from the Clustering Coefficient algorithm. + * Wraps the vertex type to encapsulate results from the local clustering coefficient algorithm. * * @param <T> ID type */ http://git-wip-us.apache.org/repos/asf/flink/blob/8da1a75c/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/TriangleCount.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/TriangleCount.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/TriangleCount.java new file mode 100644 index 0000000..bc43725 --- /dev/null +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/TriangleCount.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.library.clustering.undirected; + +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.Utils.CountHelper; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.graph.AbstractGraphAnalytic; +import org.apache.flink.graph.Graph; +import org.apache.flink.types.CopyableValue; +import org.apache.flink.util.AbstractID; + +import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT; + +/** + * Count the number of distinct triangles in an undirected graph. + * + * @param <K> graph ID type + * @param <VV> vertex value type + * @param <EV> edge value type + * @see TriangleListing + */ +public class TriangleCount<K extends Comparable<K> & CopyableValue<K>, VV, EV> +extends AbstractGraphAnalytic<K, VV, EV, Long> { + + private String id = new AbstractID().toString(); + + // Optional configuration + private int littleParallelism = PARALLELISM_DEFAULT; + + /** + * Override the parallelism of operators processing small amounts of data. + * + * @param littleParallelism operator parallelism + * @return this + */ + public TriangleCount<K, VV, EV> setLittleParallelism(int littleParallelism) { + this.littleParallelism = littleParallelism; + + return this; + } + + @Override + public TriangleCount<K, VV, EV> run(Graph<K, VV, EV> input) + throws Exception { + super.run(input); + + DataSet<Tuple3<K, K, K>> triangles = input + .run(new TriangleListing<K, VV, EV>() + .setSortTriangleVertices(false) + .setLittleParallelism(littleParallelism)); + + triangles.output(new CountHelper<Tuple3<K, K, K>>(id)).name("Count triangles"); + + return this; + } + + @Override + public Long getResult() { + return env.getLastJobExecutionResult().<Long> getAccumulatorResult(id); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/8da1a75c/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/TriangleListing.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/TriangleListing.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/TriangleListing.java index 1319d02..6245433 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/TriangleListing.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/TriangleListing.java @@ -286,11 +286,9 @@ implements GraphAlgorithm<K, VV, EV, DataSet<Tuple3<K, K, K>>> { @Override public Tuple3<T, T, T> map(Tuple3<T, T, T> value) throws Exception { - T temp_val; - // by the triangle listing algorithm we know f1 < f2 if (value.f0.compareTo(value.f1) > 0) { - temp_val = value.f0; + T temp_val = value.f0; value.f0 = value.f1; if (temp_val.compareTo(value.f2) <= 0) { http://git-wip-us.apache.org/repos/asf/flink/blob/8da1a75c/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/undirected/VertexMetrics.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/undirected/VertexMetrics.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/undirected/VertexMetrics.java new file mode 100644 index 0000000..41ae27a --- /dev/null +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/undirected/VertexMetrics.java @@ -0,0 +1,251 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.library.metric.undirected; + +import org.apache.commons.lang3.builder.EqualsBuilder; +import org.apache.commons.lang3.builder.HashCodeBuilder; +import org.apache.flink.api.common.JobExecutionResult; +import org.apache.flink.api.common.accumulators.LongCounter; +import org.apache.flink.api.common.io.RichOutputFormat; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.graph.AbstractGraphAnalytic; +import org.apache.flink.graph.Graph; +import org.apache.flink.graph.Vertex; +import org.apache.flink.graph.asm.degree.annotate.undirected.VertexDegree; +import org.apache.flink.graph.library.metric.undirected.VertexMetrics.Result; +import org.apache.flink.types.CopyableValue; +import org.apache.flink.types.LongValue; +import org.apache.flink.util.AbstractID; + +import java.io.IOException; + +import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT; + +/** + * Compute the number of vertices, number of edges, and number of triplets in + * an undirected graph. + * + * @param <K> graph ID type + * @param <VV> vertex value type + * @param <EV> edge value type + */ +public class VertexMetrics<K extends Comparable<K> & CopyableValue<K>, VV, EV> +extends AbstractGraphAnalytic<K, VV, EV, Result> { + + private String id = new AbstractID().toString(); + + // Optional configuration + private boolean includeZeroDegreeVertices = false; + + private boolean reduceOnTargetId = false; + + private int parallelism = PARALLELISM_DEFAULT; + + /** + * By default only the edge set is processed for the computation of degree. + * When this flag is set an additional join is performed against the vertex + * set in order to output vertices with a degree of zero. + * + * @param includeZeroDegreeVertices whether to output vertices with a + * degree of zero + * @return this + */ + public VertexMetrics<K, VV, EV> setIncludeZeroDegreeVertices(boolean includeZeroDegreeVertices) { + this.includeZeroDegreeVertices = includeZeroDegreeVertices; + + return this; + } + + /** + * The degree can be counted from either the edge source or target IDs. + * By default the source IDs are counted. Reducing on target IDs may + * optimize the algorithm if the input edge list is sorted by target ID. + * + * @param reduceOnTargetId set to {@code true} if the input edge list + * is sorted by target ID + * @return this + */ + public VertexMetrics<K, VV, EV> setReduceOnTargetId(boolean reduceOnTargetId) { + this.reduceOnTargetId = reduceOnTargetId; + + return this; + } + + /** + * Override the operator parallelism. + * + * @param parallelism operator parallelism + * @return this + */ + public VertexMetrics<K, VV, EV> setParallelism(int parallelism) { + this.parallelism = parallelism; + + return this; + } + + @Override + public VertexMetrics<K, VV, EV> run(Graph<K, VV, EV> input) + throws Exception { + super.run(input); + + DataSet<Vertex<K, LongValue>> vertexDegree = input + .run(new VertexDegree<K, VV, EV>() + .setIncludeZeroDegreeVertices(includeZeroDegreeVertices) + .setReduceOnTargetId(reduceOnTargetId) + .setParallelism(parallelism)); + + vertexDegree.output(new VertexMetricsHelper<K>(id)).name("Vertex metrics"); + + return this; + } + + @Override + public Result getResult() { + JobExecutionResult res = env.getLastJobExecutionResult(); + + long vertexCount = res.getAccumulatorResult(id + "-0"); + long edgeCount = res.getAccumulatorResult(id + "-1"); + long tripletCount = res.getAccumulatorResult(id + "-2"); + + return new Result(vertexCount, edgeCount / 2, tripletCount); + } + + /** + * Helper class to collect vertex metrics. + * + * @param <T> ID type + */ + private static class VertexMetricsHelper<T> + extends RichOutputFormat<Vertex<T, LongValue>> { + private final String id; + + private long vertexCount; + private long edgeCount; + private long tripletCount; + + /** + * This helper class collects vertex metrics by scanning over and + * discarding elements from the given DataSet. + * + * The unique id is required because Flink's accumulator namespace is + * among all operators. + * + * @param id unique string used for accumulator names + */ + public VertexMetricsHelper(String id) { + this.id = id; + } + + @Override + public void configure(Configuration parameters) {} + + @Override + public void open(int taskNumber, int numTasks) throws IOException {} + + @Override + public void writeRecord(Vertex<T, LongValue> record) throws IOException { + long degree = record.f1.getValue(); + + vertexCount++; + edgeCount += degree; + tripletCount += degree * (degree - 1) / 2; + } + + @Override + public void close() throws IOException { + getRuntimeContext().addAccumulator(id + "-0", new LongCounter(vertexCount)); + getRuntimeContext().addAccumulator(id + "-1", new LongCounter(edgeCount)); + getRuntimeContext().addAccumulator(id + "-2", new LongCounter(tripletCount)); + } + } + + /** + * Wraps vertex metrics. + */ + public static class Result { + private long vertexCount; + private long edgeCount; + private long tripletCount; + + public Result(long vertexCount, long edgeCount, long tripletCount) { + this.vertexCount = vertexCount; + this.edgeCount = edgeCount; + this.tripletCount = tripletCount; + } + + /** + * Get the number of vertices. + * + * @return number of vertices + */ + public long getNumberOfVertices() { + return vertexCount; + } + + /** + * Get the number of edges. + * + * @return number of edges + */ + public long getNumberOfEdges() { + return edgeCount; + } + + /** + * Get the number of triplets. + * + * @return number of triplets + */ + public long getNumberOfTriplets() { + return tripletCount; + } + + @Override + public String toString() { + return "vertex count: " + vertexCount + + ", edge count:" + edgeCount + + ", triplet count: " + tripletCount; + } + + @Override + public int hashCode() { + return new HashCodeBuilder() + .append(vertexCount) + .append(edgeCount) + .append(tripletCount) + .hashCode(); + } + + @Override + public boolean equals(Object obj) { + if (obj == null) { return false; } + if (obj == this) { return true; } + if (obj.getClass() != getClass()) { return false; } + + Result rhs = (Result)obj; + + return new EqualsBuilder() + .append(vertexCount, rhs.vertexCount) + .append(edgeCount, rhs.edgeCount) + .append(tripletCount, rhs.tripletCount) + .isEquals(); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/8da1a75c/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/clustering/undirected/GlobalClusteringCoefficientTest.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/clustering/undirected/GlobalClusteringCoefficientTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/clustering/undirected/GlobalClusteringCoefficientTest.java new file mode 100644 index 0000000..71ec2a6 --- /dev/null +++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/clustering/undirected/GlobalClusteringCoefficientTest.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.library.clustering.undirected; + +import org.apache.commons.math3.util.CombinatoricsUtils; +import org.apache.flink.graph.asm.AsmTestBase; +import org.apache.flink.graph.library.clustering.undirected.GlobalClusteringCoefficient.Result; +import org.apache.flink.types.IntValue; +import org.apache.flink.types.LongValue; +import org.apache.flink.types.NullValue; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; + +public class GlobalClusteringCoefficientTest +extends AsmTestBase { + + @Test + public void testWithSimpleGraph() + throws Exception { + Result expectedResult = new Result(13, 6); + + Result globalClusteringCoefficient = new GlobalClusteringCoefficient<IntValue, NullValue, NullValue>() + .run(undirectedSimpleGraph) + .execute(); + + assertEquals(expectedResult, globalClusteringCoefficient); + } + + @Test + public void testWithCompleteGraph() + throws Exception { + long expectedDegree = completeGraphVertexCount - 1; + long expectedCount = completeGraphVertexCount * CombinatoricsUtils.binomialCoefficient((int)expectedDegree, 2); + + Result expectedResult = new Result(expectedCount, expectedCount); + + Result globalClusteringCoefficient = new GlobalClusteringCoefficient<LongValue, NullValue, NullValue>() + .run(completeGraph) + .execute(); + + assertEquals(expectedResult, globalClusteringCoefficient); + } + + @Test + public void testWithEmptyGraph() + throws Exception { + Result expectedResult = new Result(0, 0); + + Result globalClusteringCoefficient = new GlobalClusteringCoefficient<LongValue, NullValue, NullValue>() + .run(emptyGraph) + .execute(); + + assertEquals(expectedResult, globalClusteringCoefficient); + } + + @Test + public void testWithRMatGraph() + throws Exception { + Result expectedResult = new Result(1003442, 225147); + + Result globalClusteringCoefficient = new GlobalClusteringCoefficient<LongValue, NullValue, NullValue>() + .run(undirectedRMatGraph) + .execute(); + + assertEquals(expectedResult, globalClusteringCoefficient); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/8da1a75c/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficientTest.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficientTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficientTest.java index 414f200..3455df4 100644 --- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficientTest.java +++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficientTest.java @@ -30,6 +30,8 @@ import org.apache.flink.types.LongValue; import org.apache.flink.types.NullValue; import org.junit.Test; +import java.util.List; + import static org.junit.Assert.assertEquals; public class LocalClusteringCoefficientTest @@ -61,7 +63,11 @@ extends AsmTestBase { DataSet<Result<LongValue>> cc = completeGraph .run(new LocalClusteringCoefficient<LongValue, NullValue, NullValue>()); - for (Result<LongValue> result : cc.collect()) { + List<Result<LongValue>> results = cc.collect(); + + assertEquals(completeGraphVertexCount, results.size()); + + for (Result<LongValue> result : results) { assertEquals(expectedDegree, result.getDegree().getValue()); assertEquals(expectedTriangleCount, result.getTriangleCount().getValue()); } http://git-wip-us.apache.org/repos/asf/flink/blob/8da1a75c/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/clustering/undirected/TriangleCountTest.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/clustering/undirected/TriangleCountTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/clustering/undirected/TriangleCountTest.java new file mode 100644 index 0000000..6bf9b0d --- /dev/null +++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/clustering/undirected/TriangleCountTest.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.library.clustering.undirected; + +import org.apache.commons.math3.util.CombinatoricsUtils; +import org.apache.flink.graph.asm.AsmTestBase; +import org.apache.flink.types.IntValue; +import org.apache.flink.types.LongValue; +import org.apache.flink.types.NullValue; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; + +public class TriangleCountTest +extends AsmTestBase { + + @Test + public void testWithSimpleGraph() + throws Exception { + long triangleCount = new TriangleCount<IntValue, NullValue, NullValue>() + .run(undirectedSimpleGraph) + .execute(); + + assertEquals(2, triangleCount); + } + + @Test + public void testWithCompleteGraph() + throws Exception { + long expectedDegree = completeGraphVertexCount - 1; + long expectedCount = completeGraphVertexCount * CombinatoricsUtils.binomialCoefficient((int)expectedDegree, 2) / 3; + + long triangleCount = new TriangleCount<LongValue, NullValue, NullValue>() + .run(completeGraph) + .execute(); + + assertEquals(expectedCount, triangleCount); + } + + @Test + public void testWithEmptyGraph() + throws Exception { + long triangleCount = new TriangleCount<LongValue, NullValue, NullValue>() + .run(emptyGraph) + .execute(); + + assertEquals(0, triangleCount); + } + + @Test + public void testWithRMatGraph() + throws Exception { + long triangleCount = new TriangleCount<LongValue, NullValue, NullValue>() + .run(undirectedRMatGraph) + .execute(); + + assertEquals(75049, triangleCount); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/8da1a75c/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/metric/undirected/VertexMetricsTest.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/metric/undirected/VertexMetricsTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/metric/undirected/VertexMetricsTest.java new file mode 100644 index 0000000..a36ca94 --- /dev/null +++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/metric/undirected/VertexMetricsTest.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.library.metric.undirected; + +import org.apache.commons.math3.util.CombinatoricsUtils; +import org.apache.flink.graph.asm.AsmTestBase; +import org.apache.flink.graph.library.metric.undirected.VertexMetrics.Result; +import org.apache.flink.types.IntValue; +import org.apache.flink.types.LongValue; +import org.apache.flink.types.NullValue; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; + +public class VertexMetricsTest +extends AsmTestBase { + + @Test + public void testWithSimpleGraph() + throws Exception { + Result expectedResult = new Result(6, 7, 13); + + Result vertexMetrics = new VertexMetrics<IntValue, NullValue, NullValue>() + .run(undirectedSimpleGraph) + .execute(); + + assertEquals(expectedResult, vertexMetrics); + } + + @Test + public void testWithCompleteGraph() + throws Exception { + long expectedDegree = completeGraphVertexCount - 1; + long expectedEdges = completeGraphVertexCount * expectedDegree / 2; + long expectedTriplets = completeGraphVertexCount * CombinatoricsUtils.binomialCoefficient((int)expectedDegree, 2); + + Result expectedResult = new Result(completeGraphVertexCount, expectedEdges, expectedTriplets); + + Result vertexMetrics = new VertexMetrics<LongValue, NullValue, NullValue>() + .run(completeGraph) + .execute(); + + assertEquals(expectedResult, vertexMetrics); + } + + @Test + public void testWithEmptyGraph() + throws Exception { + Result expectedResult; + + expectedResult = new Result(0, 0, 0); + + Result withoutZeroDegreeVertices = new VertexMetrics<LongValue, NullValue, NullValue>() + .setIncludeZeroDegreeVertices(false) + .run(emptyGraph) + .execute(); + + assertEquals(withoutZeroDegreeVertices, expectedResult); + + expectedResult = new Result(3, 0, 0); + + Result withZeroDegreeVertices = new VertexMetrics<LongValue, NullValue, NullValue>() + .setIncludeZeroDegreeVertices(true) + .run(emptyGraph) + .execute(); + + assertEquals(expectedResult, withZeroDegreeVertices); + } + + @Test + public void testWithRMatGraph() + throws Exception { + Result expectedResult = new Result(902, 10442, 1003442); + + Result withoutZeroDegreeVertices = new VertexMetrics<LongValue, NullValue, NullValue>() + .run(undirectedRMatGraph) + .execute(); + + assertEquals(expectedResult, withoutZeroDegreeVertices); + } +}