[FLINK-3907] [gelly] Directed Clustering Coefficient This closes #2079
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d34bdaf7 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d34bdaf7 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d34bdaf7 Branch: refs/heads/master Commit: d34bdaf7f1de4dd4f742867e6b9e879c8b631765 Parents: d92aeb7 Author: Greg Hogan <c...@greghogan.com> Authored: Wed May 25 06:41:12 2016 -0400 Committer: EC2 Default User <ec2-user@ip-10-0-0-56.ec2.internal> Committed: Fri Jun 24 17:34:38 2016 +0000 ---------------------------------------------------------------------- docs/apis/batch/libs/gelly.md | 10 +- .../java/org/apache/flink/api/java/Utils.java | 2 +- .../graph/examples/ClusteringCoefficient.java | 235 ++++++++++ .../flink/graph/examples/JaccardIndex.java | 9 +- .../examples/LocalClusteringCoefficient.java | 136 ------ .../flink/graph/examples/TriangleListing.java | 182 +++++--- .../org/apache/flink/graph/scala/Graph.scala | 4 +- .../main/java/org/apache/flink/graph/Graph.java | 3 +- .../org/apache/flink/graph/GraphAnalytic.java | 2 +- .../degree/annotate/directed/VertexDegrees.java | 2 +- .../directed/GlobalClusteringCoefficient.java | 177 ++++++++ .../directed/LocalClusteringCoefficient.java | 271 ++++++++++++ .../clustering/directed/TriangleCount.java | 81 ++++ .../clustering/directed/TriangleListing.java | 440 +++++++++++++++++++ .../undirected/GlobalClusteringCoefficient.java | 25 +- .../undirected/LocalClusteringCoefficient.java | 16 +- .../clustering/undirected/TriangleCount.java | 4 +- .../clustering/undirected/TriangleListing.java | 5 +- .../library/metric/directed/VertexMetrics.java | 238 ++++++++++ .../metric/undirected/VertexMetrics.java | 4 +- .../apache/flink/graph/utils/Murmur3_32.java | 4 +- .../org/apache/flink/graph/asm/AsmTestBase.java | 6 +- .../annotate/directed/EdgeDegreesPairTest.java | 18 +- .../directed/EdgeSourceDegreesTest.java | 14 +- .../directed/EdgeTargetDegreesTest.java | 16 +- .../annotate/directed/VertexDegreesTest.java | 12 +- .../annotate/directed/VertexInDegreeTest.java | 6 +- .../annotate/directed/VertexOutDegreeTest.java | 6 +- .../GlobalClusteringCoefficientTest.java | 86 ++++ .../LocalClusteringCoefficientTest.java | 85 ++++ .../clustering/directed/TriangleCountTest.java | 77 ++++ .../directed/TriangleListingTest.java | 84 ++++ .../LocalClusteringCoefficientTest.java | 8 +- .../metric/directed/VertexMetricsTest.java | 99 +++++ .../library/similarity/JaccardIndexTest.java | 2 +- 35 files changed, 2107 insertions(+), 262 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/d34bdaf7/docs/apis/batch/libs/gelly.md ---------------------------------------------------------------------- diff --git a/docs/apis/batch/libs/gelly.md b/docs/apis/batch/libs/gelly.md index 45fdbe5..166d2d2 100644 --- a/docs/apis/batch/libs/gelly.md +++ b/docs/apis/batch/libs/gelly.md @@ -2112,8 +2112,9 @@ divided by the number of potential edges between neighbors. See the [Triangle Enumeration](#triangle-enumeration) library method for a detailed explanation of triangle enumeration. #### Usage -The algorithm takes a simple, undirected graph as input and outputs a `DataSet` of tuples containing the vertex ID, -vertex degree, and number of triangles containing the vertex. The graph ID type must be `Comparable` and `Copyable`. +Directed and undirected variants are provided. The algorithms take a simple graph as input and output a `DataSet` of +tuples containing the vertex ID, vertex degree, and number of triangles containing the vertex. The graph ID type must be +`Comparable` and `Copyable`. ### Global Clustering Coefficient @@ -2126,8 +2127,9 @@ See the [Local Clustering Coefficient](#local-clustering-coefficient) library me clustering coefficient. #### Usage -The algorithm takes a simple, undirected graph as input and outputs a result containing the total number of triplets and -triangles in the graph. The graph ID type must be `Comparable` and `Copyable`. +Directed and undirected variants are provided. The algorithm takes a simple graph as input and outputs a result +containing the total number of triplets and triangles in the graph. The graph ID type must be `Comparable` and +`Copyable`. {% top %} http://git-wip-us.apache.org/repos/asf/flink/blob/d34bdaf7/flink-java/src/main/java/org/apache/flink/api/java/Utils.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/Utils.java b/flink-java/src/main/java/org/apache/flink/api/java/Utils.java index 89b6d17..36ccb23 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/Utils.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/Utils.java @@ -204,7 +204,7 @@ public final class Utils { @Override public int hashCode() { - return (int) (this.count + this.hashCode()); + return (int) (this.count + this.checksum); } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/d34bdaf7/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/ClusteringCoefficient.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/ClusteringCoefficient.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/ClusteringCoefficient.java new file mode 100644 index 0000000..547ef97 --- /dev/null +++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/ClusteringCoefficient.java @@ -0,0 +1,235 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.examples; + +import org.apache.commons.lang3.StringEscapeUtils; +import org.apache.commons.lang3.text.WordUtils; +import org.apache.commons.math3.random.JDKRandomGenerator; +import org.apache.flink.api.common.JobExecutionResult; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.io.CsvOutputFormat; +import org.apache.flink.api.java.utils.DataSetUtils; +import org.apache.flink.api.java.utils.ParameterTool; +import org.apache.flink.graph.Graph; +import org.apache.flink.graph.GraphAnalytic; +import org.apache.flink.graph.asm.translate.LongValueToIntValue; +import org.apache.flink.graph.asm.translate.TranslateGraphIds; +import org.apache.flink.graph.generator.RMatGraph; +import org.apache.flink.graph.generator.random.JDKRandomGeneratorFactory; +import org.apache.flink.graph.generator.random.RandomGenerableFactory; +import org.apache.flink.types.IntValue; +import org.apache.flink.types.LongValue; +import org.apache.flink.types.NullValue; + +import java.text.NumberFormat; + +/** + * Driver for the library implementations of Global and Local Clustering Coefficient. + * + * This example reads a simple directed or undirected graph from a CSV file or + * generates an RMat graph with the given scale and edge factor then calculates + * the local clustering coefficient for each vertex and the global clustering + * coefficient for the graph. + * + * @see org.apache.flink.graph.library.clustering.directed.GlobalClusteringCoefficient + * @see org.apache.flink.graph.library.clustering.directed.LocalClusteringCoefficient + * @see org.apache.flink.graph.library.clustering.undirected.GlobalClusteringCoefficient + * @see org.apache.flink.graph.library.clustering.undirected.LocalClusteringCoefficient + */ +public class ClusteringCoefficient { + + public static final int DEFAULT_SCALE = 10; + + public static final int DEFAULT_EDGE_FACTOR = 16; + + public static final boolean DEFAULT_CLIP_AND_FLIP = true; + + private static void printUsage() { + System.out.println(WordUtils.wrap("The local clustering coefficient measures the connectedness of each" + + " vertex's neighborhood and the global clustering coefficient measures the connectedness of the graph." + + " Scores range from 0.0 (no edges between neighbors or vertices) to 1.0 (neighborhood or graph" + + " is a clique).", 80)); + System.out.println(); + System.out.println(WordUtils.wrap("This algorithm returns tuples containing the vertex ID, the degree of" + + " the vertex, and the number of edges between vertex neighbors.", 80)); + System.out.println(); + System.out.println("usage: ClusteringCoefficient --directed <true | false> --input <csv | rmat [options]> --output <print | hash | csv [options]"); + System.out.println(); + System.out.println("options:"); + System.out.println(" --input csv --input_filename FILENAME [--input_line_delimiter LINE_DELIMITER] [--input_field_delimiter FIELD_DELIMITER]"); + System.out.println(" --input rmat [--scale SCALE] [--edge_factor EDGE_FACTOR]"); + System.out.println(); + System.out.println(" --output print"); + System.out.println(" --output hash"); + System.out.println(" --output csv --output_filename FILENAME [--output_line_delimiter LINE_DELIMITER] [--output_field_delimiter FIELD_DELIMITER]"); + } + + public static void main(String[] args) throws Exception { + // Set up the execution environment + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + env.getConfig().enableObjectReuse(); + + ParameterTool parameters = ParameterTool.fromArgs(args); + if (! parameters.has("directed")) { + printUsage(); + return; + } + boolean directedAlgorithm = parameters.getBoolean("directed"); + + // global and local clustering coefficient results + GraphAnalytic gcc; + DataSet lcc; + + switch (parameters.get("input", "")) { + case "csv": { + String lineDelimiter = StringEscapeUtils.unescapeJava( + parameters.get("input_line_delimiter", CsvOutputFormat.DEFAULT_LINE_DELIMITER)); + + String fieldDelimiter = StringEscapeUtils.unescapeJava( + parameters.get("input_field_delimiter", CsvOutputFormat.DEFAULT_FIELD_DELIMITER)); + + Graph<LongValue, NullValue, NullValue> graph = Graph + .fromCsvReader(parameters.get("input_filename"), env) + .ignoreCommentsEdges("#") + .lineDelimiterEdges(lineDelimiter) + .fieldDelimiterEdges(fieldDelimiter) + .keyType(LongValue.class); + + if (directedAlgorithm) { + gcc = graph + .run(new org.apache.flink.graph.library.clustering.directed.GlobalClusteringCoefficient<LongValue, NullValue, NullValue>()); + lcc = graph + .run(new org.apache.flink.graph.library.clustering.directed.LocalClusteringCoefficient<LongValue, NullValue, NullValue>()); + } else { + gcc = graph + .run(new org.apache.flink.graph.library.clustering.undirected.GlobalClusteringCoefficient<LongValue, NullValue, NullValue>()); + lcc = graph + .run(new org.apache.flink.graph.library.clustering.undirected.LocalClusteringCoefficient<LongValue, NullValue, NullValue>()); + } + } break; + + case "rmat": { + int scale = parameters.getInt("scale", DEFAULT_SCALE); + int edgeFactor = parameters.getInt("edge_factor", DEFAULT_EDGE_FACTOR); + + RandomGenerableFactory<JDKRandomGenerator> rnd = new JDKRandomGeneratorFactory(); + + long vertexCount = 1L << scale; + long edgeCount = vertexCount * edgeFactor; + + Graph<LongValue, NullValue, NullValue> graph = new RMatGraph<>(env, rnd, vertexCount, edgeCount) + .generate(); + + if (directedAlgorithm) { + if (scale > 32) { + Graph<LongValue, NullValue, NullValue> newGraph = graph + .run(new org.apache.flink.graph.asm.simple.directed.Simplify<LongValue, NullValue, NullValue>()); + + gcc = newGraph + .run(new org.apache.flink.graph.library.clustering.directed.GlobalClusteringCoefficient<LongValue, NullValue, NullValue>()); + lcc = newGraph + .run(new org.apache.flink.graph.library.clustering.directed.LocalClusteringCoefficient<LongValue, NullValue, NullValue>()); + } else { + Graph<IntValue, NullValue, NullValue> newGraph = graph + .run(new TranslateGraphIds<LongValue, IntValue, NullValue, NullValue>(new LongValueToIntValue())) + .run(new org.apache.flink.graph.asm.simple.directed.Simplify<IntValue, NullValue, NullValue>()); + + gcc = newGraph + .run(new org.apache.flink.graph.library.clustering.directed.GlobalClusteringCoefficient<IntValue, NullValue, NullValue>()); + lcc = newGraph + .run(new org.apache.flink.graph.library.clustering.directed.LocalClusteringCoefficient<IntValue, NullValue, NullValue>()); + } + } else { + boolean clipAndFlip = parameters.getBoolean("clip_and_flip", DEFAULT_CLIP_AND_FLIP); + + if (scale > 32) { + Graph<LongValue, NullValue, NullValue> newGraph = graph + .run(new org.apache.flink.graph.asm.simple.undirected.Simplify<LongValue, NullValue, NullValue>(clipAndFlip)); + + gcc = newGraph + .run(new org.apache.flink.graph.library.clustering.undirected.GlobalClusteringCoefficient<LongValue, NullValue, NullValue>()); + lcc = newGraph + .run(new org.apache.flink.graph.library.clustering.undirected.LocalClusteringCoefficient<LongValue, NullValue, NullValue>()); + } else { + Graph<IntValue, NullValue, NullValue> newGraph = graph + .run(new TranslateGraphIds<LongValue, IntValue, NullValue, NullValue>(new LongValueToIntValue())) + .run(new org.apache.flink.graph.asm.simple.undirected.Simplify<IntValue, NullValue, NullValue>(clipAndFlip)); + + gcc = newGraph + .run(new org.apache.flink.graph.library.clustering.undirected.GlobalClusteringCoefficient<IntValue, NullValue, NullValue>()); + lcc = newGraph + .run(new org.apache.flink.graph.library.clustering.undirected.LocalClusteringCoefficient<IntValue, NullValue, NullValue>()); + } + } + } break; + + default: + printUsage(); + return; + } + + switch (parameters.get("output", "")) { + case "print": + if (directedAlgorithm) { + for (Object e: lcc.collect()) { + org.apache.flink.graph.library.clustering.directed.LocalClusteringCoefficient.Result result = + (org.apache.flink.graph.library.clustering.directed.LocalClusteringCoefficient.Result)e; + System.out.println(result.toVerboseString()); + } + } else { + for (Object e: lcc.collect()) { + org.apache.flink.graph.library.clustering.undirected.LocalClusteringCoefficient.Result result = + (org.apache.flink.graph.library.clustering.undirected.LocalClusteringCoefficient.Result)e; + System.out.println(result.toVerboseString()); + } + } + System.out.println(gcc.getResult()); + break; + + case "hash": + System.out.println(DataSetUtils.checksumHashCode(lcc)); + System.out.println(gcc.getResult()); + break; + + case "csv": + String filename = parameters.get("output_filename"); + + String lineDelimiter = StringEscapeUtils.unescapeJava( + parameters.get("output_line_delimiter", CsvOutputFormat.DEFAULT_LINE_DELIMITER)); + + String fieldDelimiter = StringEscapeUtils.unescapeJava( + parameters.get("output_field_delimiter", CsvOutputFormat.DEFAULT_FIELD_DELIMITER)); + + lcc.writeAsCsv(filename, lineDelimiter, fieldDelimiter); + + System.out.println(gcc.execute()); + break; + + default: + printUsage(); + return; + } + + JobExecutionResult result = env.getLastJobExecutionResult(); + + NumberFormat nf = NumberFormat.getInstance(); + System.out.println("Execution runtime: " + nf.format(result.getNetRuntime()) + " ms"); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/d34bdaf7/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/JaccardIndex.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/JaccardIndex.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/JaccardIndex.java index c078d73..46a296a 100644 --- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/JaccardIndex.java +++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/JaccardIndex.java @@ -97,10 +97,10 @@ public class JaccardIndex { Graph<LongValue, NullValue, NullValue> graph = Graph .fromCsvReader(parameters.get("input_filename"), env) - .ignoreCommentsEdges("#") - .lineDelimiterEdges(lineDelimiter) - .fieldDelimiterEdges(fieldDelimiter) - .keyType(LongValue.class); + .ignoreCommentsEdges("#") + .lineDelimiterEdges(lineDelimiter) + .fieldDelimiterEdges(fieldDelimiter) + .keyType(LongValue.class); ji = graph .run(new org.apache.flink.graph.library.similarity.JaccardIndex<LongValue, NullValue, NullValue>()); @@ -162,6 +162,7 @@ public class JaccardIndex { env.execute(); break; + default: printUsage(); return; http://git-wip-us.apache.org/repos/asf/flink/blob/d34bdaf7/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/LocalClusteringCoefficient.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/LocalClusteringCoefficient.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/LocalClusteringCoefficient.java deleted file mode 100644 index bed68b2..0000000 --- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/LocalClusteringCoefficient.java +++ /dev/null @@ -1,136 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.graph.examples; - -import org.apache.commons.lang3.text.WordUtils; -import org.apache.commons.math3.random.JDKRandomGenerator; -import org.apache.flink.api.common.JobExecutionResult; -import org.apache.flink.api.java.DataSet; -import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.api.java.io.CsvOutputFormat; -import org.apache.flink.api.java.utils.DataSetUtils; -import org.apache.flink.api.java.utils.ParameterTool; -import org.apache.flink.graph.Graph; -import org.apache.flink.graph.asm.simple.undirected.Simplify; -import org.apache.flink.graph.asm.translate.LongValueToIntValue; -import org.apache.flink.graph.asm.translate.TranslateGraphIds; -import org.apache.flink.graph.generator.RMatGraph; -import org.apache.flink.graph.generator.random.JDKRandomGeneratorFactory; -import org.apache.flink.graph.generator.random.RandomGenerableFactory; -import org.apache.flink.graph.library.clustering.undirected.LocalClusteringCoefficient.Result; -import org.apache.flink.types.IntValue; -import org.apache.flink.types.LongValue; -import org.apache.flink.types.NullValue; - -import java.text.NumberFormat; - -/** - * Driver for the library implementation of Local Clustering Coefficient. - * - * This example generates an undirected RMat graph with the given scale and - * edge factor then calculates the local clustering coefficient for each vertex. - * - * @see org.apache.flink.graph.library.clustering.undirected.LocalClusteringCoefficient - */ -public class LocalClusteringCoefficient { - - public static final int DEFAULT_SCALE = 10; - - public static final int DEFAULT_EDGE_FACTOR = 16; - - public static final boolean DEFAULT_CLIP_AND_FLIP = true; - - public static void main(String[] args) throws Exception { - // Set up the execution environment - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - env.getConfig().enableObjectReuse(); - - ParameterTool parameters = ParameterTool.fromArgs(args); - - // Generate RMat graph - int scale = parameters.getInt("scale", DEFAULT_SCALE); - int edgeFactor = parameters.getInt("edge_factor", DEFAULT_EDGE_FACTOR); - - RandomGenerableFactory<JDKRandomGenerator> rnd = new JDKRandomGeneratorFactory(); - - long vertexCount = 1L << scale; - long edgeCount = vertexCount * edgeFactor; - - boolean clipAndFlip = parameters.getBoolean("clip_and_flip", DEFAULT_CLIP_AND_FLIP); - - Graph<LongValue, NullValue, NullValue> graph = new RMatGraph<>(env, rnd, vertexCount, edgeCount) - .generate() - .run(new Simplify<LongValue, NullValue, NullValue>(clipAndFlip)); - - DataSet cc; - - if (scale > 32) { - cc = graph - .run(new org.apache.flink.graph.library.clustering.undirected.LocalClusteringCoefficient<LongValue, NullValue, NullValue>()); - } else { - cc = graph - .run(new TranslateGraphIds<LongValue, IntValue, NullValue, NullValue>(new LongValueToIntValue())) - .run(new org.apache.flink.graph.library.clustering.undirected.LocalClusteringCoefficient<IntValue, NullValue, NullValue>()); - } - - switch (parameters.get("output", "")) { - case "print": - for (Object e: cc.collect()) { - Result result = (Result)e; - System.out.println(result.toVerboseString()); - } - break; - - case "hash": - System.out.println(DataSetUtils.checksumHashCode(cc)); - break; - - case "csv": - String filename = parameters.get("filename"); - - String row_delimiter = parameters.get("row_delimiter", CsvOutputFormat.DEFAULT_LINE_DELIMITER); - String field_delimiter = parameters.get("field_delimiter", CsvOutputFormat.DEFAULT_FIELD_DELIMITER); - - cc.writeAsCsv(filename, row_delimiter, field_delimiter); - - env.execute(); - break; - default: - System.out.println(WordUtils.wrap("The local clustering coefficient measures the connectedness of each" + - " vertex's neighborhood. Scores range from 0.0 (no edges between neighbors) to 1.0 (neighborhood" + - " is a clique).", 80)); - System.out.println(); - System.out.println(WordUtils.wrap("This algorithm returns tuples containing the vertex ID, the degree of" + - " the vertex, the number of edges between vertex neighbors, and the local clustering coefficient.", 80)); - System.out.println(); - System.out.println("usage:"); - System.out.println(" LocalClusteringCoefficient [--scale SCALE] [--edge_factor EDGE_FACTOR] --output print"); - System.out.println(" LocalClusteringCoefficient [--scale SCALE] [--edge_factor EDGE_FACTOR] --output hash"); - System.out.println(" LocalClusteringCoefficient [--scale SCALE] [--edge_factor EDGE_FACTOR] --output csv" + - " --filename FILENAME [--row_delimiter ROW_DELIMITER] [--field_delimiter FIELD_DELIMITER]"); - - return; - } - - JobExecutionResult result = env.getLastJobExecutionResult(); - - NumberFormat nf = NumberFormat.getInstance(); - System.out.println("Execution runtime: " + nf.format(result.getNetRuntime()) + " ms"); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/d34bdaf7/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/TriangleListing.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/TriangleListing.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/TriangleListing.java index a20bf20..9bd41f0 100644 --- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/TriangleListing.java +++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/TriangleListing.java @@ -18,6 +18,8 @@ package org.apache.flink.graph.examples; +import org.apache.commons.lang3.StringEscapeUtils; +import org.apache.commons.lang3.text.WordUtils; import org.apache.commons.math3.random.JDKRandomGenerator; import org.apache.flink.api.common.JobExecutionResult; import org.apache.flink.api.java.DataSet; @@ -41,9 +43,11 @@ import java.text.NumberFormat; /** * Driver for the library implementation of Triangle Listing. * - * This example generates an undirected RMat graph with the given scale - * and edge factor then lists all triangles. + * This example reads a simple directed or undirected graph from a CSV file or + * generates an RMat graph with the given scale and edge factor then lists + * all triangles. * + * @see org.apache.flink.graph.library.clustering.directed.TriangleListing * @see org.apache.flink.graph.library.clustering.undirected.TriangleListing */ public class TriangleListing { @@ -54,68 +58,142 @@ public class TriangleListing { public static final boolean DEFAULT_CLIP_AND_FLIP = true; + private static void printUsage() { + System.out.println(WordUtils.wrap("Lists all triangles in a graph.", 80)); + System.out.println(); + System.out.println(WordUtils.wrap("This algorithm returns tuples containing the vertex IDs for each triangle and" + + " for directed graphs a bitmask indicating the presence of the six potential connecting edges.", 80)); + System.out.println(); + System.out.println("usage: TriangleListing --directed <true | false> --input <csv | rmat [options]> --output <print | hash | csv [options]"); + System.out.println(); + System.out.println("options:"); + System.out.println(" --input csv --input_filename FILENAME [--input_line_delimiter LINE_DELIMITER] [--input_field_delimiter FIELD_DELIMITER]"); + System.out.println(" --input rmat [--scale SCALE] [--edge_factor EDGE_FACTOR]"); + System.out.println(); + System.out.println(" --output print"); + System.out.println(" --output hash"); + System.out.println(" --output csv --output_filename FILENAME [--output_line_delimiter LINE_DELIMITER] [--output_field_delimiter FIELD_DELIMITER]"); + } + public static void main(String[] args) throws Exception { // Set up the execution environment final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); env.getConfig().enableObjectReuse(); ParameterTool parameters = ParameterTool.fromArgs(args); - - // Generate RMat graph - int scale = parameters.getInt("scale", DEFAULT_SCALE); - int edgeFactor = parameters.getInt("edge_factor", DEFAULT_EDGE_FACTOR); - - RandomGenerableFactory<JDKRandomGenerator> rnd = new JDKRandomGeneratorFactory(); - - long vertexCount = 1L << scale; - long edgeCount = vertexCount * edgeFactor; - - boolean clipAndFlip = parameters.getBoolean("clip_and_flip", DEFAULT_CLIP_AND_FLIP); - - Graph<LongValue, NullValue, NullValue> graph = new RMatGraph<>(env, rnd, vertexCount, edgeCount) - .generate() - .run(new Simplify<LongValue, NullValue, NullValue>(clipAndFlip)); + if (! parameters.has("directed")) { + printUsage(); + return; + } + boolean directedAlgorithm = parameters.getBoolean("directed"); DataSet tl; - if (scale > 32) { - tl = graph - .run(new org.apache.flink.graph.library.clustering.undirected.TriangleListing<LongValue, NullValue, NullValue>()); - } else { - tl = graph - .run(new TranslateGraphIds<LongValue, IntValue, NullValue, NullValue>(new LongValueToIntValue())) - .run(new org.apache.flink.graph.library.clustering.undirected.TriangleListing<IntValue, NullValue, NullValue>()); + switch (parameters.get("input", "")) { + case "csv": { + String lineDelimiter = StringEscapeUtils.unescapeJava( + parameters.get("input_line_delimiter", CsvOutputFormat.DEFAULT_LINE_DELIMITER)); + + String fieldDelimiter = StringEscapeUtils.unescapeJava( + parameters.get("input_field_delimiter", CsvOutputFormat.DEFAULT_FIELD_DELIMITER)); + + Graph<LongValue, NullValue, NullValue> graph = Graph + .fromCsvReader(parameters.get("input_filename"), env) + .ignoreCommentsEdges("#") + .lineDelimiterEdges(lineDelimiter) + .fieldDelimiterEdges(fieldDelimiter) + .keyType(LongValue.class); + + if (directedAlgorithm) { + tl = graph + .run(new org.apache.flink.graph.library.clustering.directed.TriangleListing<LongValue, NullValue, NullValue>()); + } else { + tl = graph + .run(new org.apache.flink.graph.library.clustering.undirected.TriangleListing<LongValue, NullValue, NullValue>()); + } + + } break; + + case "rmat": { + int scale = parameters.getInt("scale", DEFAULT_SCALE); + int edgeFactor = parameters.getInt("edge_factor", DEFAULT_EDGE_FACTOR); + + RandomGenerableFactory<JDKRandomGenerator> rnd = new JDKRandomGeneratorFactory(); + + long vertexCount = 1L << scale; + long edgeCount = vertexCount * edgeFactor; + + Graph<LongValue, NullValue, NullValue> graph = new RMatGraph<>(env, rnd, vertexCount, edgeCount) + .generate(); + + if (directedAlgorithm) { + if (scale > 32) { + tl = graph + .run(new org.apache.flink.graph.asm.simple.directed.Simplify<LongValue, NullValue, NullValue>()) + .run(new org.apache.flink.graph.library.clustering.directed.TriangleListing<LongValue, NullValue, NullValue>()); + } else { + tl = graph + .run(new TranslateGraphIds<LongValue, IntValue, NullValue, NullValue>(new LongValueToIntValue())) + .run(new org.apache.flink.graph.asm.simple.directed.Simplify<IntValue, NullValue, NullValue>()) + .run(new org.apache.flink.graph.library.clustering.directed.TriangleListing<IntValue, NullValue, NullValue>()); + } + } else { + boolean clipAndFlip = parameters.getBoolean("clip_and_flip", DEFAULT_CLIP_AND_FLIP); + + graph = graph + .run(new Simplify<LongValue, NullValue, NullValue>(clipAndFlip)); + + if (scale > 32) { + tl = graph + .run(new org.apache.flink.graph.asm.simple.undirected.Simplify<LongValue, NullValue, NullValue>(clipAndFlip)) + .run(new org.apache.flink.graph.library.clustering.undirected.TriangleListing<LongValue, NullValue, NullValue>()); + } else { + tl = graph + .run(new TranslateGraphIds<LongValue, IntValue, NullValue, NullValue>(new LongValueToIntValue())) + .run(new org.apache.flink.graph.asm.simple.undirected.Simplify<IntValue, NullValue, NullValue>(clipAndFlip)) + .run(new org.apache.flink.graph.library.clustering.undirected.TriangleListing<IntValue, NullValue, NullValue>()); + } + } + } break; + + default: + printUsage(); + return; } switch (parameters.get("output", "")) { - case "print": - tl.print(); - break; - - case "hash": - System.out.println(DataSetUtils.checksumHashCode(tl)); - break; - - case "csv": - String filename = parameters.get("filename"); - - String row_delimiter = parameters.get("row_delimiter", CsvOutputFormat.DEFAULT_LINE_DELIMITER); - String field_delimiter = parameters.get("field_delimiter", CsvOutputFormat.DEFAULT_FIELD_DELIMITER); - - tl.writeAsCsv(filename, row_delimiter, field_delimiter); - - env.execute(); - break; - default: - System.out.println("Lists all distinct triangles in the generated RMat graph."); - System.out.println(); - System.out.println("usage:"); - System.out.println(" TriangleListing [--scale SCALE] [--edge_factor EDGE_FACTOR] --output print"); - System.out.println(" TriangleListing [--scale SCALE] [--edge_factor EDGE_FACTOR] --output hash"); - System.out.println(" TriangleListing [--scale SCALE] [--edge_factor EDGE_FACTOR] --output csv" + - " --filename FILENAME [--row_delimiter ROW_DELIMITER] [--field_delimiter FIELD_DELIMITER]"); - - return; + case "print": + if (directedAlgorithm) { + for (Object e: tl.collect()) { + org.apache.flink.graph.library.clustering.directed.TriangleListing.Result result = + (org.apache.flink.graph.library.clustering.directed.TriangleListing.Result) e; + System.out.println(result.toVerboseString()); + } + } else { + tl.print(); + } + break; + + case "hash": + System.out.println(DataSetUtils.checksumHashCode(tl)); + break; + + case "csv": + String filename = parameters.get("output_filename"); + + String lineDelimiter = StringEscapeUtils.unescapeJava( + parameters.get("output_line_delimiter", CsvOutputFormat.DEFAULT_LINE_DELIMITER)); + + String fieldDelimiter = StringEscapeUtils.unescapeJava( + parameters.get("output_field_delimiter", CsvOutputFormat.DEFAULT_FIELD_DELIMITER)); + + tl.writeAsCsv(filename, lineDelimiter, fieldDelimiter); + + env.execute(); + break; + default: + printUsage(); + return; } JobExecutionResult result = env.getLastJobExecutionResult(); http://git-wip-us.apache.org/repos/asf/flink/blob/d34bdaf7/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/Graph.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/Graph.scala b/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/Graph.scala index 3881aae..165f6c2 100644 --- a/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/Graph.scala +++ b/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/Graph.scala @@ -1127,8 +1127,10 @@ TypeInformation : ClassTag](jgraph: jg.Graph[K, VV, EV]) { * * @param analytic the analytic to run on the Graph */ - def run[T: TypeInformation : ClassTag](analytic: GraphAnalytic[K, VV, EV, T])= { + def run[T: TypeInformation : ClassTag](analytic: GraphAnalytic[K, VV, EV, T]): + GraphAnalytic[K, VV, EV, T] = { jgraph.run(analytic) + analytic } /** http://git-wip-us.apache.org/repos/asf/flink/blob/d34bdaf7/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java index dd25cfd..3dbb9c4 100755 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java @@ -1797,8 +1797,9 @@ public class Graph<K, VV, EV> { * @param <T> the result type * @throws Exception */ - public <T> void run(GraphAnalytic<K, VV, EV, T> analytic) throws Exception { + public <T> GraphAnalytic<K, VV, EV, T> run(GraphAnalytic<K, VV, EV, T> analytic) throws Exception { analytic.run(this); + return analytic; } /** http://git-wip-us.apache.org/repos/asf/flink/blob/d34bdaf7/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/GraphAnalytic.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/GraphAnalytic.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/GraphAnalytic.java index dd221dc..0bdc792 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/GraphAnalytic.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/GraphAnalytic.java @@ -38,7 +38,7 @@ public interface GraphAnalytic<K, VV, EV, T> { * This method must be called after the program has executed: * 1) "run" analytics and algorithms * 2) call ExecutionEnvironment.execute() - * 3) get analytics results + * 3) get analytic results * * @return the result */ http://git-wip-us.apache.org/repos/asf/flink/blob/d34bdaf7/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexDegrees.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexDegrees.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexDegrees.java index 71a0859..1f1d4ab 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexDegrees.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexDegrees.java @@ -142,7 +142,7 @@ implements GraphAlgorithm<K, VV, EV, DataSet<Vertex<K, Degrees>>> { } /** - * Combine mutual edges. + * Reduce bitmasks to a single value using bitwise-or. * * @param <T> ID type */ http://git-wip-us.apache.org/repos/asf/flink/blob/d34bdaf7/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/GlobalClusteringCoefficient.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/GlobalClusteringCoefficient.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/GlobalClusteringCoefficient.java new file mode 100644 index 0000000..3b5452b --- /dev/null +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/GlobalClusteringCoefficient.java @@ -0,0 +1,177 @@ +/* + * + * * Licensed to the Apache Software Foundation (ASF) under one + * * or more contributor license agreements. See the NOTICE file + * * distributed with this work for additional information + * * regarding copyright ownership. The ASF licenses this file + * * to you under the Apache License, Version 2.0 (the + * * "License"); you may not use this file except in compliance + * * with the License. You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * + */ + +package org.apache.flink.graph.library.clustering.directed; + +import org.apache.commons.lang3.builder.EqualsBuilder; +import org.apache.commons.lang3.builder.HashCodeBuilder; +import org.apache.flink.graph.AbstractGraphAnalytic; +import org.apache.flink.graph.Graph; +import org.apache.flink.graph.library.clustering.directed.GlobalClusteringCoefficient.Result; +import org.apache.flink.graph.library.metric.directed.VertexMetrics; +import org.apache.flink.types.CopyableValue; + +import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT; + +/** + * The global clustering coefficient measures the connectedness of a graph. + * Scores range from 0.0 (no triangles) to 1.0 (complete graph). + * + * @param <K> graph ID type + * @param <VV> vertex value type + * @param <EV> edge value type + */ +public class GlobalClusteringCoefficient<K extends Comparable<K> & CopyableValue<K>, VV, EV> +extends AbstractGraphAnalytic<K, VV, EV, Result> { + + private TriangleCount<K, VV, EV> triangleCount; + + private VertexMetrics<K, VV, EV> vertexMetrics; + + // Optional configuration + private int littleParallelism = PARALLELISM_DEFAULT; + + /** + * Override the parallelism of operators processing small amounts of data. + * + * @param littleParallelism operator parallelism + * @return this + */ + public GlobalClusteringCoefficient<K, VV, EV> setLittleParallelism(int littleParallelism) { + this.littleParallelism = littleParallelism; + + return this; + } + + /* + * Implementation notes: + * + * The requirement that "K extends CopyableValue<K>" can be removed when + * removed from TriangleListing. + */ + + @Override + public GlobalClusteringCoefficient<K, VV, EV> run(Graph<K, VV, EV> input) + throws Exception { + super.run(input); + + triangleCount = new TriangleCount<K, VV, EV>() + .setLittleParallelism(littleParallelism); + + input.run(triangleCount); + + vertexMetrics = new VertexMetrics<K, VV, EV>() + .setParallelism(littleParallelism); + + input.run(vertexMetrics); + + return this; + } + + @Override + public Result getResult() { + // each triangle must be counted from each of the three vertices + // as each triplet is counted in this manner + long numberOfTriangles = 3 * triangleCount.getResult(); + + return new Result(vertexMetrics.getResult().getNumberOfTriplets(), numberOfTriangles); + } + + /** + * Wraps global clustering coefficient metrics. + */ + public static class Result { + private long tripletCount; + + private long triangleCount; + + /** + * Instantiate an immutable result. + * + * @param tripletCount triplet count + * @param triangleCount triangle count + */ + public Result(long tripletCount, long triangleCount) { + this.tripletCount = tripletCount; + this.triangleCount = triangleCount; + } + + /** + * Get the number of triplets. + * + * @return number of triplets + */ + public long getNumberOfTriplets() { + return tripletCount; + } + + /** + * Get the number of triangles. + * + * @return number of triangles + */ + public long getNumberOfTriangles() { + return triangleCount; + } + + /** + * Get the global clustering coefficient score. This is computed as the + * number of closed triplets (triangles) divided by the total number of + * triplets. + * + * A score of {@code Double.NaN} is returned for a graph of isolated vertices + * for which both the triangle count and number of neighbors are zero. + * + * @return global clustering coefficient score + */ + public double getGlobalClusteringCoefficientScore() { + return (tripletCount == 0) ? Double.NaN : triangleCount / (double)tripletCount; + } + + @Override + public String toString() { + return "triplet count: " + tripletCount + + ", triangle count: " + triangleCount + + ", global clustering coefficient: " + getGlobalClusteringCoefficientScore(); + } + + @Override + public int hashCode() { + return new HashCodeBuilder() + .append(tripletCount) + .append(triangleCount) + .hashCode(); + } + + @Override + public boolean equals(Object obj) { + if (obj == null) { return false; } + if (obj == this) { return true; } + if (obj.getClass() != getClass()) { return false; } + + Result rhs = (Result)obj; + + return new EqualsBuilder() + .append(tripletCount, rhs.tripletCount) + .append(triangleCount, rhs.triangleCount) + .isEquals(); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/d34bdaf7/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/LocalClusteringCoefficient.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/LocalClusteringCoefficient.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/LocalClusteringCoefficient.java new file mode 100644 index 0000000..1314771 --- /dev/null +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/LocalClusteringCoefficient.java @@ -0,0 +1,271 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.library.clustering.directed; + +import org.apache.flink.api.common.functions.FlatMapFunction; +import org.apache.flink.api.common.functions.JoinFunction; +import org.apache.flink.api.common.functions.ReduceFunction; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.functions.FunctionAnnotation; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.graph.Graph; +import org.apache.flink.graph.GraphAlgorithm; +import org.apache.flink.graph.Vertex; +import org.apache.flink.graph.asm.degree.annotate.directed.VertexDegrees; +import org.apache.flink.graph.asm.degree.annotate.directed.VertexDegrees.Degrees; +import org.apache.flink.graph.library.clustering.directed.LocalClusteringCoefficient.Result; +import org.apache.flink.graph.utils.Murmur3_32; +import org.apache.flink.types.CopyableValue; +import org.apache.flink.types.LongValue; +import org.apache.flink.util.Collector; +import org.apache.flink.util.Preconditions; + +import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT; + +/** + * The local clustering coefficient measures the connectedness of each vertex's + * neighborhood. Scores range from 0.0 (no edges between neighbors) to 1.0 + * (neighborhood is a clique). + * <br/> + * An edge between a vertex's neighbors is a triangle. Counting edges between + * neighbors is equivalent to counting the number of triangles which include + * the vertex. + * <br/> + * The input graph must be a simple graph containing no duplicate edges or + * self-loops. + * + * @param <K> graph ID type + * @param <VV> vertex value type + * @param <EV> edge value type + */ +public class LocalClusteringCoefficient<K extends Comparable<K> & CopyableValue<K>, VV, EV> +implements GraphAlgorithm<K, VV, EV, DataSet<Result<K>>> { + + // Optional configuration + private int littleParallelism = PARALLELISM_DEFAULT; + + /** + * Override the parallelism of operators processing small amounts of data. + * + * @param littleParallelism operator parallelism + * @return this + */ + public LocalClusteringCoefficient<K, VV, EV> setLittleParallelism(int littleParallelism) { + Preconditions.checkArgument(littleParallelism > 0 || littleParallelism == PARALLELISM_DEFAULT, + "The parallelism must be greater than zero."); + + this.littleParallelism = littleParallelism; + + return this; + } + + /* + * Implementation notes: + * + * The requirement that "K extends CopyableValue<K>" can be removed when + * removed from TriangleListing. + * + * CountVertices can be replaced by ".sum(1)" when Flink aggregators use + * code generation. + */ + + @Override + public DataSet<Result<K>> run(Graph<K, VV, EV> input) + throws Exception { + // u, v, w, bitmask + DataSet<TriangleListing.Result<K>> triangles = input + .run(new TriangleListing<K,VV,EV>() + .setSortTriangleVertices(false) + .setLittleParallelism(littleParallelism)); + + // u, edge count + DataSet<Tuple2<K, LongValue>> triangleVertices = triangles + .flatMap(new SplitTriangles<K>()) + .name("Split triangle vertices"); + + // u, triangle count + DataSet<Tuple2<K, LongValue>> vertexTriangleCount = triangleVertices + .groupBy(0) + .reduce(new CountTriangles<K>()) + .name("Count triangles"); + + // u, deg(u) + DataSet<Vertex<K, Degrees>> vertexDegree = input + .run(new VertexDegrees<K, VV, EV>() + .setParallelism(littleParallelism) + .setIncludeZeroDegreeVertices(true)); + + // u, deg(u), triangle count + return vertexDegree + .leftOuterJoin(vertexTriangleCount) + .where(0) + .equalTo(0) + .with(new JoinVertexDegreeWithTriangleCount<K>()) + .setParallelism(littleParallelism) + .name("Clustering coefficient"); + } + + /** + * Emits the three vertex IDs comprising each triangle along with an initial count. + * + * @param <T> ID type + */ + private class SplitTriangles<T> + implements FlatMapFunction<TriangleListing.Result<T>, Tuple2<T, LongValue>> { + private LongValue one = new LongValue(1); + + private LongValue two = new LongValue(2); + + private Tuple2<T, LongValue> output = new Tuple2<>(); + + @Override + public void flatMap(TriangleListing.Result<T> value, Collector<Tuple2<T, LongValue>> out) + throws Exception { + byte bitmask = value.f3.getValue(); + + output.f0 = value.f0; + output.f1 = ((bitmask & 0b000011) == 0b000011) ? two : one; + out.collect(output); + + output.f0 = value.f1; + output.f1 = ((bitmask & 0b001100) == 0b001100) ? two : one; + out.collect(output); + + output.f0 = value.f2; + output.f1 = ((bitmask & 0b110000) == 0b110000) ? two : one; + out.collect(output); + } + } + + /** + * Sums the triangle count for each vertex ID. + * + * @param <T> ID type + */ + @FunctionAnnotation.ForwardedFields("0") + private class CountTriangles<T> + implements ReduceFunction<Tuple2<T, LongValue>> { + @Override + public Tuple2<T, LongValue> reduce(Tuple2<T, LongValue> left, Tuple2<T, LongValue> right) + throws Exception { + left.f1.setValue(left.f1.getValue() + right.f1.getValue()); + return left; + } + } + + /** + * Joins the vertex and degree with the vertex's triangle count. + * + * @param <T> ID type + */ + @FunctionAnnotation.ForwardedFieldsFirst("0; 1.0->1.0") + @FunctionAnnotation.ForwardedFieldsSecond("0") + private class JoinVertexDegreeWithTriangleCount<T> + implements JoinFunction<Vertex<T, Degrees>, Tuple2<T, LongValue>, Result<T>> { + private LongValue zero = new LongValue(0); + + private Result<T> output = new Result<>(); + + @Override + public Result<T> join(Vertex<T, Degrees> vertexAndDegree, Tuple2<T, LongValue> vertexAndTriangleCount) + throws Exception { + output.f0 = vertexAndDegree.f0; + output.f1.f0 = vertexAndDegree.f1.f0; + output.f1.f1 = (vertexAndTriangleCount == null) ? zero : vertexAndTriangleCount.f1; + + return output; + } + } + + /** + * Wraps the vertex type to encapsulate results from the Local Clustering Coefficient algorithm. + * + * @param <T> ID type + */ + public static class Result<T> + extends Vertex<T, Tuple2<LongValue, LongValue>> { + public static final int HASH_SEED = 0x37a208c4; + + private Murmur3_32 hasher = new Murmur3_32(HASH_SEED); + + /** + * No-args constructor. + */ + public Result() { + f1 = new Tuple2<>(); + } + + /** + * Get the vertex degree. + * + * @return vertex degree + */ + public LongValue getDegree() { + return f1.f0; + } + + /** + * Get the number of triangles containing this vertex; equivalently, + * this is the number of edges between neighbors of this vertex. + * + * @return triangle count + */ + public LongValue getTriangleCount() { + return f1.f1; + } + + /** + * Get the local clustering coefficient score. This is computed as the + * number of edges between neighbors, equal to the triangle count, + * divided by the number of potential edges between neighbors. + * + * A score of {@code Double.NaN} is returned for a vertex with degree 1 + * for which both the triangle count and number of neighbors are zero. + * + * @return local clustering coefficient score + */ + public double getLocalClusteringCoefficientScore() { + long degree = getDegree().getValue(); + long neighborPairs = degree * (degree - 1); + + return (neighborPairs == 0) ? Double.NaN : getTriangleCount().getValue() / (double)neighborPairs; + } + + /** + * Format values into a human-readable string. + * + * @return verbose string + */ + public String toVerboseString() { + return "Vertex ID: " + f0 + + ", vertex degree: " + getDegree() + + ", triangle count: " + getTriangleCount() + + ", local clustering coefficient: " + getLocalClusteringCoefficientScore(); + } + + @Override + public int hashCode() { + return hasher.reset() + .hash(f0.hashCode()) + .hash(f1.f0.getValue()) + .hash(f1.f1.getValue()) + .hash(); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/d34bdaf7/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/TriangleCount.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/TriangleCount.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/TriangleCount.java new file mode 100644 index 0000000..9ba53f1 --- /dev/null +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/TriangleCount.java @@ -0,0 +1,81 @@ +/* + * + * * Licensed to the Apache Software Foundation (ASF) under one + * * or more contributor license agreements. See the NOTICE file + * * distributed with this work for additional information + * * regarding copyright ownership. The ASF licenses this file + * * to you under the Apache License, Version 2.0 (the + * * "License"); you may not use this file except in compliance + * * with the License. You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * + */ + +package org.apache.flink.graph.library.clustering.directed; + +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.Utils.CountHelper; +import org.apache.flink.graph.AbstractGraphAnalytic; +import org.apache.flink.graph.Graph; +import org.apache.flink.types.CopyableValue; +import org.apache.flink.util.AbstractID; + +import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT; + +/** + * Count the number of distinct triangles in an undirected graph. + * + * @param <K> graph ID type + * @param <VV> vertex value type + * @param <EV> edge value type + * @see TriangleListing + */ +public class TriangleCount<K extends Comparable<K> & CopyableValue<K>, VV, EV> +extends AbstractGraphAnalytic<K, VV, EV, Long> { + + private String id = new AbstractID().toString(); + + // Optional configuration + private int littleParallelism = PARALLELISM_DEFAULT; + + /** + * Override the parallelism of operators processing small amounts of data. + * + * @param littleParallelism operator parallelism + * @return this + */ + public TriangleCount<K, VV, EV> setLittleParallelism(int littleParallelism) { + this.littleParallelism = littleParallelism; + + return this; + } + + @Override + public TriangleCount<K, VV, EV> run(Graph<K, VV, EV> input) + throws Exception { + super.run(input); + + DataSet<TriangleListing.Result<K>> triangles = input + .run(new TriangleListing<K, VV, EV>() + .setSortTriangleVertices(false) + .setLittleParallelism(littleParallelism)); + + triangles + .output(new CountHelper<TriangleListing.Result<K>>(id)) + .name("Count triangles"); + + return this; + } + + @Override + public Long getResult() { + return env.getLastJobExecutionResult().<Long> getAccumulatorResult(id); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/d34bdaf7/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/TriangleListing.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/TriangleListing.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/TriangleListing.java new file mode 100644 index 0000000..5c364f5 --- /dev/null +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/TriangleListing.java @@ -0,0 +1,440 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.library.clustering.directed; + +import org.apache.flink.api.common.functions.GroupReduceFunction; +import org.apache.flink.api.common.functions.JoinFunction; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.common.operators.Order; +import org.apache.flink.api.common.operators.base.JoinOperatorBase; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields; +import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsFirst; +import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsSecond; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.api.java.tuple.Tuple4; +import org.apache.flink.graph.Edge; +import org.apache.flink.graph.EdgeOrder; +import org.apache.flink.graph.Graph; +import org.apache.flink.graph.GraphAlgorithm; +import org.apache.flink.graph.asm.degree.annotate.directed.EdgeDegreesPair; +import org.apache.flink.graph.asm.degree.annotate.directed.VertexDegrees.Degrees; +import org.apache.flink.types.ByteValue; +import org.apache.flink.types.CopyableValue; +import org.apache.flink.util.Collector; +import org.apache.flink.util.Preconditions; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; + +import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT; + +/** + * Generates a listing of distinct triangles from the input graph. + * <br/> + * A triangle is a 3-clique with vertices A, B, and C connected by edges + * (A, B), (A, C), and (B, C). + * <br/> + * The input graph must not contain duplicate edges or self-loops. + * + * @param <K> graph ID type + * @param <VV> vertex value type + * @param <EV> edge value type + */ +public class TriangleListing<K extends Comparable<K> & CopyableValue<K>, VV, EV> +implements GraphAlgorithm<K, VV, EV, DataSet<TriangleListing.Result<K>>> { + + // Optional configuration + private boolean sortTriangleVertices = false; + + private int littleParallelism = PARALLELISM_DEFAULT; + + /** + * Normalize the triangle listing such that for each result (K0, K1, K2) + * the vertex IDs are sorted K0 < K1 < K2. + * + * @param sortTriangleVertices whether to output each triangle's vertices in sorted order + * @return this + */ + public TriangleListing<K, VV, EV> setSortTriangleVertices(boolean sortTriangleVertices) { + this.sortTriangleVertices = sortTriangleVertices; + + return this; + } + + /** + * Override the parallelism of operators processing small amounts of data. + * + * @param littleParallelism operator parallelism + * @return this + */ + public TriangleListing<K, VV, EV> setLittleParallelism(int littleParallelism) { + Preconditions.checkArgument(littleParallelism > 0 || littleParallelism == PARALLELISM_DEFAULT, + "The parallelism must be greater than zero."); + + this.littleParallelism = littleParallelism; + + return this; + } + + /* + * Implementation notes: + * + * The requirement that "K extends CopyableValue<K>" can be removed when + * Flink has a self-join and GenerateTriplets is implemented as such. + * + * ProjectTriangles should eventually be replaced by ".projectFirst("*")" + * when projections use code generation. + */ + + @Override + public DataSet<Result<K>> run(Graph<K, VV, EV> input) + throws Exception { + // u, v, bitmask where u < v + DataSet<Tuple3<K, K, ByteValue>> filteredByID = input + .getEdges() + .map(new OrderByID<K, EV>()) + .setParallelism(littleParallelism) + .name("Order by ID") + .groupBy(0, 1) + .reduceGroup(new ReduceBitmask<K>()) + .setParallelism(littleParallelism) + .name("Flatten by ID"); + + // u, v, (deg(u), deg(v)) + DataSet<Edge<K, Tuple3<EV, Degrees, Degrees>>> pairDegrees = input + .run(new EdgeDegreesPair<K, VV, EV>() + .setParallelism(littleParallelism)); + + // u, v, bitmask where deg(u) < deg(v) or (deg(u) == deg(v) and u < v) + DataSet<Tuple3<K, K, ByteValue>> filteredByDegree = pairDegrees + .map(new OrderByDegree<K, EV>()) + .setParallelism(littleParallelism) + .name("Order by degree") + .groupBy(0, 1) + .reduceGroup(new ReduceBitmask<K>()) + .setParallelism(littleParallelism) + .name("Flatten by degree"); + + // u, v, w, bitmask where (u, v) and (u, w) are edges in graph + DataSet<Tuple4<K, K, K, ByteValue>> triplets = filteredByDegree + .groupBy(0) + .sortGroup(1, Order.ASCENDING) + .reduceGroup(new GenerateTriplets<K>()) + .setParallelism(littleParallelism) + .name("Generate triplets"); + + // u, v, w, bitmask where (u, v), (u, w), and (v, w) are edges in graph + DataSet<Result<K>> triangles = triplets + .join(filteredByID, JoinOperatorBase.JoinHint.REPARTITION_HASH_SECOND) + .where(1, 2) + .equalTo(0, 1) + .with(new ProjectTriangles<K>()) + .setParallelism(littleParallelism) + .name("Triangle listing"); + + if (sortTriangleVertices) { + triangles = triangles + .map(new SortTriangleVertices<K>()) + .name("Sort triangle vertices"); + } + + return triangles; + } + + /** + * Removes edge values while emitting a Tuple3 where f0 and f1 are, + * respectively, the lesser and greater of the source and target IDs. + * The third field is a bitmask representing the vertex order. + * + * @param <T> ID type + * @param <ET> edge value type + */ + private static final class OrderByID<T extends Comparable<T>, ET> + implements MapFunction<Edge<T, ET>, Tuple3<T, T, ByteValue>> { + private ByteValue forward = new ByteValue(EdgeOrder.FORWARD.getBitmask()); + + private ByteValue reverse = new ByteValue(EdgeOrder.REVERSE.getBitmask()); + + private Tuple3<T, T, ByteValue> output = new Tuple3<>(); + + @Override + public Tuple3<T, T, ByteValue> map(Edge<T, ET> value) + throws Exception { + if (value.f0.compareTo(value.f1) < 0) { + output.f0 = value.f0; + output.f1 = value.f1; + output.f2 = forward; + } else { + output.f0 = value.f1; + output.f1 = value.f0; + output.f2 = reverse; + } + + return output; + } + } + + /** + * Reduce bitmasks to a single value using bitwise-or. + * + * @param <T> ID type + */ + @ForwardedFields("0; 1") + private static final class ReduceBitmask<T> + implements GroupReduceFunction<Tuple3<T, T, ByteValue>, Tuple3<T, T, ByteValue>> { + @Override + public void reduce(Iterable<Tuple3<T, T, ByteValue>> values, Collector<Tuple3<T, T, ByteValue>> out) + throws Exception { + Tuple3<T, T, ByteValue> output = null; + + byte bitmask = 0; + + for (Tuple3<T, T, ByteValue> value: values) { + output = value; + bitmask |= value.f2.getValue(); + } + + output.f2.setValue(bitmask); + out.collect(output); + } + } + + /** + * Removes edge values while emitting a Tuple3 where f0 and f1 are, + * respectively, the lesser and greater of the source and target IDs + * by degree count. If the source and target vertex degrees are equal + * then the IDs are compared and emitted in order. The third field is + * a bitmask representing the vertex order. + * + * @param <T> ID type + * @param <ET> edge value type + */ + private static final class OrderByDegree<T extends Comparable<T>, ET> + implements MapFunction<Edge<T, Tuple3<ET, Degrees, Degrees>>, Tuple3<T, T, ByteValue>> { + private ByteValue forward = new ByteValue((byte)(EdgeOrder.FORWARD.getBitmask() << 2)); + + private ByteValue reverse = new ByteValue((byte)(EdgeOrder.REVERSE.getBitmask() << 2)); + + private Tuple3<T, T, ByteValue> output = new Tuple3<>(); + + @Override + public Tuple3<T, T, ByteValue> map(Edge<T, Tuple3<ET, Degrees, Degrees>> value) + throws Exception { + Tuple3<ET, Degrees, Degrees> degrees = value.f2; + long sourceDegree = degrees.f1.getDegree().getValue(); + long targetDegree = degrees.f2.getDegree().getValue(); + + if (sourceDegree < targetDegree || + (sourceDegree == targetDegree && value.f0.compareTo(value.f1) < 0)) { + output.f0 = value.f0; + output.f1 = value.f1; + output.f2 = forward; + } else { + output.f0 = value.f1; + output.f1 = value.f0; + output.f2 = reverse; + } + + return output; + } + } + + /** + * Generates the set of triplets by the pairwise enumeration of the open + * neighborhood for each vertex. The number of triplets is quadratic in + * the vertex degree; however, data skew is minimized by only generating + * triplets from the vertex with least degree. + * + * @param <T> ID type + */ + @ForwardedFields("0") + private static final class GenerateTriplets<T extends CopyableValue<T>> + implements GroupReduceFunction<Tuple3<T, T, ByteValue>, Tuple4<T, T, T, ByteValue>> { + private Tuple4<T, T, T, ByteValue> output = new Tuple4<>(null, null, null, new ByteValue()); + + private List<Tuple2<T, ByteValue>> visited = new ArrayList<>(); + + @Override + public void reduce(Iterable<Tuple3<T, T, ByteValue>> values, Collector<Tuple4<T, T, T, ByteValue>> out) + throws Exception { + int visitedCount = 0; + + Iterator<Tuple3<T, T, ByteValue>> iter = values.iterator(); + + while (true) { + Tuple3<T, T, ByteValue> edge = iter.next(); + byte bitmask = edge.f2.getValue(); + + output.f0 = edge.f0; + output.f2 = edge.f1; + + for (int i = 0; i < visitedCount; i++) { + Tuple2<T, ByteValue> previous = visited.get(i); + + output.f1 = previous.f0; + output.f3.setValue((byte)(previous.f1.getValue() | bitmask)); + + // u, v, w, bitmask + out.collect(output); + } + + if (! iter.hasNext()) { + break; + } + + byte shiftedBitmask = (byte)(bitmask << 2); + + if (visitedCount == visited.size()) { + visited.add(new Tuple2<>(edge.f1.copy(), new ByteValue(shiftedBitmask))); + } else { + Tuple2<T, ByteValue> update = visited.get(visitedCount); + edge.f1.copyTo(update.f0); + update.f1.setValue(shiftedBitmask); + } + + visitedCount += 1; + } + } + } + + /** + * Simply project the triplet as a triangle while collapsing triplet and edge bitmasks. + * + * @param <T> ID type + */ + @ForwardedFieldsFirst("0; 1; 2") + @ForwardedFieldsSecond("0; 1") + private static final class ProjectTriangles<T> + implements JoinFunction<Tuple4<T, T, T, ByteValue>, Tuple3<T, T, ByteValue>, Result<T>> { + private Result<T> output = new Result<>(null, null, null, new ByteValue()); + + @Override + public Result<T> join(Tuple4<T, T, T, ByteValue> triplet, Tuple3<T, T, ByteValue> edge) + throws Exception { + output.f0 = triplet.f0; + output.f1 = triplet.f1; + output.f2 = triplet.f2; + output.f3.setValue((byte)(triplet.f3.getValue() | edge.f2.getValue())); + return output; + } + } + + /** + * Reorders the vertices of each emitted triangle (K0, K1, K2, bitmask) + * into sorted order such that K0 < K1 < K2. + * + * @param <T> ID type + */ + private static final class SortTriangleVertices<T extends Comparable<T>> + implements MapFunction<Result<T>, Result<T>> { + @Override + public Result<T> map(Result<T> value) + throws Exception { + // by the triangle listing algorithm we know f1 < f2 + if (value.f0.compareTo(value.f1) > 0) { + byte bitmask = value.f3.getValue(); + + T temp_val = value.f0; + value.f0 = value.f1; + + if (temp_val.compareTo(value.f2) < 0) { + value.f1 = temp_val; + + int f0f1 = ((bitmask & 0b100000) >>> 1) | ((bitmask & 0b010000) << 1); + int f0f2 = (bitmask & 0b001100) >>> 2; + int f1f2 = (bitmask & 0b000011) << 2; + + value.f3.setValue((byte)(f0f1 | f0f2 | f1f2)); + } else { + value.f1 = value.f2; + value.f2 = temp_val; + + int f0f1 = (bitmask & 0b000011) << 4; + int f0f2 = ((bitmask & 0b100000) >>> 3) | ((bitmask & 0b010000) >>> 1); + int f1f2 = ((bitmask & 0b001000) >>> 3) | ((bitmask & 0b000100) >>> 1); + + value.f3.setValue((byte)(f0f1 | f0f2 | f1f2)); + } + } + + return value; + } + } + + /** + * Wraps the vertex type to encapsulate results from the Triangle Listing algorithm. + * + * @param <T> ID type + */ + public static class Result<T> + extends Tuple4<T, T, T, ByteValue> { + /** + * No-args constructor. + */ + public Result() {} + + /** + * Populates parent tuple with constructor parameters. + * + * @param value0 1st triangle vertex ID + * @param value1 2nd triangle vertex ID + * @param value2 3rd triangle vertex ID + * @param value3 bitmask indicating presence of six possible edges between triangle vertices + */ + public Result(T value0, T value1, T value2, ByteValue value3) { + super(value0, value1, value2, value3); + } + + /** + * Format values into a human-readable string. + * + * @return verbose string + */ + public String toVerboseString() { + byte bitmask = f3.getValue(); + + return "1st vertex ID: " + f0 + + ", 2nd vertex ID: " + f1 + + ", 3rd vertex ID: " + f2 + + ", edge directions: " + f0 + maskToString(bitmask, 4) + f1 + + ", " + f0 + maskToString(bitmask, 2) + f2 + + ", " + f1 + maskToString(bitmask, 0) + f2; + } + + private String maskToString(byte mask, int shift) { + switch((mask >>> shift) & 0b000011) { + case 0b01: + // EdgeOrder.FORWARD + return "->"; + case 0b10: + // EdgeOrder.REVERSE + return "<-"; + case 0b11: + // EdgeOrder.MUTUAL + return "<->"; + default: + throw new IllegalArgumentException("Bitmask is missing an edge (mask = " + + mask + ", shift = " + shift); + } + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/d34bdaf7/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/GlobalClusteringCoefficient.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/GlobalClusteringCoefficient.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/GlobalClusteringCoefficient.java index fc89e43..c11cb3c 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/GlobalClusteringCoefficient.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/GlobalClusteringCoefficient.java @@ -58,6 +58,13 @@ extends AbstractGraphAnalytic<K, VV, EV, Result> { return this; } + /* + * Implementation notes: + * + * The requirement that "K extends CopyableValue<K>" can be removed when + * removed from TriangleListing. + */ + @Override public GlobalClusteringCoefficient<K, VV, EV> run(Graph<K, VV, EV> input) throws Exception { @@ -78,7 +85,10 @@ extends AbstractGraphAnalytic<K, VV, EV, Result> { @Override public Result getResult() { - return new Result(vertexMetrics.getResult().getNumberOfTriplets(), 3 * triangleCount.getResult()); + // each triangle is counted from each of the three vertices + long numberOfTriangles = 3 * triangleCount.getResult(); + + return new Result(vertexMetrics.getResult().getNumberOfTriplets(), numberOfTriangles); } /** @@ -86,8 +96,15 @@ extends AbstractGraphAnalytic<K, VV, EV, Result> { */ public static class Result { private long tripletCount; + private long triangleCount; + /** + * Instantiate an immutable result. + * + * @param tripletCount triplet count + * @param triangleCount triangle count + */ public Result(long tripletCount, long triangleCount) { this.tripletCount = tripletCount; this.triangleCount = triangleCount; @@ -121,13 +138,15 @@ extends AbstractGraphAnalytic<K, VV, EV, Result> { * * @return global clustering coefficient score */ - public double getLocalClusteringCoefficientScore() { + public double getGlobalClusteringCoefficientScore() { return (tripletCount == 0) ? Double.NaN : triangleCount / (double)tripletCount; } @Override public String toString() { - return "triplet count: " + tripletCount + ", triangle count:" + triangleCount; + return "triplet count: " + tripletCount + + ", triangle count: " + triangleCount + + ", global clustering coefficient: " + getGlobalClusteringCoefficientScore(); } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/d34bdaf7/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficient.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficient.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficient.java index bc62d36..6858818 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficient.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficient.java @@ -102,7 +102,7 @@ implements GraphAlgorithm<K, VV, EV, DataSet<Result<K>>> { // u, triangle count DataSet<Tuple2<K, LongValue>> vertexTriangleCount = triangleVertices .groupBy(0) - .reduce(new CountVertices<K>()) + .reduce(new CountTriangles<K>()) .name("Count triangles"); // u, deg(u) @@ -145,12 +145,12 @@ implements GraphAlgorithm<K, VV, EV, DataSet<Result<K>>> { } /** - * Combines the count of each vertex ID. + * Sums the triangle count for each vertex ID. * * @param <T> ID type */ @FunctionAnnotation.ForwardedFields("0") - private static class CountVertices<T> + private static class CountTriangles<T> implements ReduceFunction<Tuple2<T, LongValue>> { @Override public Tuple2<T, LongValue> reduce(Tuple2<T, LongValue> left, Tuple2<T, LongValue> right) @@ -185,7 +185,7 @@ implements GraphAlgorithm<K, VV, EV, DataSet<Result<K>>> { } /** - * Wraps the vertex type to encapsulate results from the local clustering coefficient algorithm. + * Wraps the vertex type to encapsulate results from the Local Clustering Coefficient algorithm. * * @param <T> ID type */ @@ -195,9 +195,6 @@ implements GraphAlgorithm<K, VV, EV, DataSet<Result<K>>> { private Murmur3_32 hasher = new Murmur3_32(HASH_SEED); - /** - * The no-arg constructor instantiates contained objects. - */ public Result() { f1 = new Tuple2<>(); } @@ -238,6 +235,11 @@ implements GraphAlgorithm<K, VV, EV, DataSet<Result<K>>> { return (neighborPairs == 0) ? Double.NaN : getTriangleCount().getValue() / (double)neighborPairs; } + /** + * Format values into a human-readable string. + * + * @return verbose string + */ public String toVerboseString() { return "Vertex ID: " + f0 + ", vertex degree: " + getDegree() http://git-wip-us.apache.org/repos/asf/flink/blob/d34bdaf7/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/TriangleCount.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/TriangleCount.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/TriangleCount.java index bc43725..46e1875 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/TriangleCount.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/TriangleCount.java @@ -66,7 +66,9 @@ extends AbstractGraphAnalytic<K, VV, EV, Long> { .setSortTriangleVertices(false) .setLittleParallelism(littleParallelism)); - triangles.output(new CountHelper<Tuple3<K, K, K>>(id)).name("Count triangles"); + triangles + .output(new CountHelper<Tuple3<K, K, K>>(id)) + .name("Count triangles"); return this; } http://git-wip-us.apache.org/repos/asf/flink/blob/d34bdaf7/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/TriangleListing.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/TriangleListing.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/TriangleListing.java index 6245433..4f8ce7a 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/TriangleListing.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/TriangleListing.java @@ -220,6 +220,7 @@ implements GraphAlgorithm<K, VV, EV, DataSet<Tuple3<K, K, K>>> { * * @param <T> ID type */ + @ForwardedFields("0") private static final class GenerateTriplets<T extends CopyableValue<T>> implements GroupReduceFunction<Tuple2<T, T>, Tuple3<T, T, T>> { private Tuple3<T, T, T> output = new Tuple3<>(); @@ -269,9 +270,9 @@ implements GraphAlgorithm<K, VV, EV, DataSet<Tuple3<K, K, K>>> { private static final class ProjectTriangles<T> implements JoinFunction<Tuple3<T, T, T>, Tuple2<T, T>, Tuple3<T, T, T>> { @Override - public Tuple3<T, T, T> join(Tuple3<T, T, T> first, Tuple2<T, T> second) + public Tuple3<T, T, T> join(Tuple3<T, T, T> triplet, Tuple2<T, T> edge) throws Exception { - return first; + return triplet; } } http://git-wip-us.apache.org/repos/asf/flink/blob/d34bdaf7/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/directed/VertexMetrics.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/directed/VertexMetrics.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/directed/VertexMetrics.java new file mode 100644 index 0000000..1c7df7e --- /dev/null +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/directed/VertexMetrics.java @@ -0,0 +1,238 @@ +/* + * + * * Licensed to the Apache Software Foundation (ASF) under one + * * or more contributor license agreements. See the NOTICE file + * * distributed with this work for additional information + * * regarding copyright ownership. The ASF licenses this file + * * to you under the Apache License, Version 2.0 (the + * * "License"); you may not use this file except in compliance + * * with the License. You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * + */ + +package org.apache.flink.graph.library.metric.directed; + +import org.apache.commons.lang3.builder.EqualsBuilder; +import org.apache.commons.lang3.builder.HashCodeBuilder; +import org.apache.flink.api.common.JobExecutionResult; +import org.apache.flink.api.common.accumulators.LongCounter; +import org.apache.flink.api.common.io.RichOutputFormat; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.graph.AbstractGraphAnalytic; +import org.apache.flink.graph.Graph; +import org.apache.flink.graph.Vertex; +import org.apache.flink.graph.asm.degree.annotate.directed.VertexDegrees; +import org.apache.flink.graph.asm.degree.annotate.directed.VertexDegrees.Degrees; +import org.apache.flink.graph.library.metric.directed.VertexMetrics.Result; +import org.apache.flink.types.CopyableValue; +import org.apache.flink.util.AbstractID; + +import java.io.IOException; + +import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT; + +/** + * Compute the number of vertices, number of edges, and number of triplets in + * a directed graph. + * + * @param <K> graph ID type + * @param <VV> vertex value type + * @param <EV> edge value type + */ +public class VertexMetrics<K extends Comparable<K> & CopyableValue<K>, VV, EV> +extends AbstractGraphAnalytic<K, VV, EV, Result> { + + private String id = new AbstractID().toString(); + + // Optional configuration + private boolean includeZeroDegreeVertices = false; + + private int parallelism = PARALLELISM_DEFAULT; + + /** + * By default only the edge set is processed for the computation of degree. + * When this flag is set an additional join is performed against the vertex + * set in order to output vertices with a degree of zero. + * + * @param includeZeroDegreeVertices whether to output vertices with a + * degree of zero + * @return this + */ + public VertexMetrics<K, VV, EV> setIncludeZeroDegreeVertices(boolean includeZeroDegreeVertices) { + this.includeZeroDegreeVertices = includeZeroDegreeVertices; + + return this; + } + + /** + * Override the operator parallelism. + * + * @param parallelism operator parallelism + * @return this + */ + public VertexMetrics<K, VV, EV> setParallelism(int parallelism) { + this.parallelism = parallelism; + + return this; + } + + @Override + public VertexMetrics<K, VV, EV> run(Graph<K, VV, EV> input) + throws Exception { + super.run(input); + + DataSet<Vertex<K, Degrees>> vertexDegree = input + .run(new VertexDegrees<K, VV, EV>() + .setIncludeZeroDegreeVertices(includeZeroDegreeVertices) + .setParallelism(parallelism)); + + vertexDegree + .output(new VertexMetricsHelper<K>(id)) + .name("Vertex metrics"); + + return this; + } + + @Override + public Result getResult() { + JobExecutionResult res = env.getLastJobExecutionResult(); + + long vertexCount = res.getAccumulatorResult(id + "-0"); + long edgeCount = res.getAccumulatorResult(id + "-1"); + long tripletCount = res.getAccumulatorResult(id + "-2"); + + return new Result(vertexCount, edgeCount / 2, tripletCount); + } + + /** + * Helper class to collect vertex metrics. + * + * @param <T> ID type + */ + private static class VertexMetricsHelper<T> + extends RichOutputFormat<Vertex<T, Degrees>> { + private final String id; + + private long vertexCount; + private long edgeCount; + private long tripletCount; + + /** + * This helper class collects vertex metrics by scanning over and + * discarding elements from the given DataSet. + * + * The unique id is required because Flink's accumulator namespace is + * among all operators. + * + * @param id unique string used for accumulator names + */ + public VertexMetricsHelper(String id) { + this.id = id; + } + + @Override + public void configure(Configuration parameters) {} + + @Override + public void open(int taskNumber, int numTasks) throws IOException {} + + @Override + public void writeRecord(Vertex<T, Degrees> record) throws IOException { + long degree = record.f1.getDegree().getValue(); + long outDegree = record.f1.getOutDegree().getValue(); + + vertexCount++; + edgeCount += outDegree; + tripletCount += degree * (degree - 1) / 2; + } + + @Override + public void close() throws IOException { + getRuntimeContext().addAccumulator(id + "-0", new LongCounter(vertexCount)); + getRuntimeContext().addAccumulator(id + "-1", new LongCounter(edgeCount)); + getRuntimeContext().addAccumulator(id + "-2", new LongCounter(tripletCount)); + } + } + + /** + * Wraps vertex metrics. + */ + public static class Result { + private long vertexCount; + private long edgeCount; + private long tripletCount; + + public Result(long vertexCount, long edgeCount, long tripletCount) { + this.vertexCount = vertexCount; + this.edgeCount = edgeCount; + this.tripletCount = tripletCount; + } + + /** + * Get the number of vertices. + * + * @return number of vertices + */ + public long getNumberOfVertices() { + return vertexCount; + } + + /** + * Get the number of edges. + * + * @return number of edges + */ + public long getNumberOfEdges() { + return edgeCount; + } + + /** + * Get the number of triplets. + * + * @return number of triplets + */ + public long getNumberOfTriplets() { + return tripletCount; + } + + @Override + public String toString() { + return "vertex count: " + vertexCount + + ", edge count:" + edgeCount + + ", triplet count: " + tripletCount; + } + + @Override + public int hashCode() { + return new HashCodeBuilder() + .append(vertexCount) + .append(edgeCount) + .append(tripletCount) + .hashCode(); + } + + @Override + public boolean equals(Object obj) { + if (obj == null) { return false; } + if (obj == this) { return true; } + if (obj.getClass() != getClass()) { return false; } + + Result rhs = (Result)obj; + + return new EqualsBuilder() + .append(vertexCount, rhs.vertexCount) + .append(edgeCount, rhs.edgeCount) + .append(tripletCount, rhs.tripletCount) + .isEquals(); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/d34bdaf7/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/undirected/VertexMetrics.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/undirected/VertexMetrics.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/undirected/VertexMetrics.java index 41ae27a..3c26e43 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/undirected/VertexMetrics.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/undirected/VertexMetrics.java @@ -111,7 +111,9 @@ extends AbstractGraphAnalytic<K, VV, EV, Result> { .setReduceOnTargetId(reduceOnTargetId) .setParallelism(parallelism)); - vertexDegree.output(new VertexMetricsHelper<K>(id)).name("Vertex metrics"); + vertexDegree + .output(new VertexMetricsHelper<K>(id)) + .name("Vertex metrics"); return this; } http://git-wip-us.apache.org/repos/asf/flink/blob/d34bdaf7/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/Murmur3_32.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/Murmur3_32.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/Murmur3_32.java index 98f2b97..6c3b5ab 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/Murmur3_32.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/Murmur3_32.java @@ -67,11 +67,11 @@ public class Murmur3_32 implements Serializable { count++; input *= 0xcc9e2d51; - input = input << 15; + input = Integer.rotateLeft(input, 15); input *= 0x1b873593; hash ^= input; - hash = hash << 13; + hash = Integer.rotateLeft(hash, 13); hash = hash * 5 + 0xe6546b64; return this;