[FLINK-3907] [gelly] Directed Clustering Coefficient

This closes #2079


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d34bdaf7
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d34bdaf7
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d34bdaf7

Branch: refs/heads/master
Commit: d34bdaf7f1de4dd4f742867e6b9e879c8b631765
Parents: d92aeb7
Author: Greg Hogan <c...@greghogan.com>
Authored: Wed May 25 06:41:12 2016 -0400
Committer: EC2 Default User <ec2-user@ip-10-0-0-56.ec2.internal>
Committed: Fri Jun 24 17:34:38 2016 +0000

----------------------------------------------------------------------
 docs/apis/batch/libs/gelly.md                   |  10 +-
 .../java/org/apache/flink/api/java/Utils.java   |   2 +-
 .../graph/examples/ClusteringCoefficient.java   | 235 ++++++++++
 .../flink/graph/examples/JaccardIndex.java      |   9 +-
 .../examples/LocalClusteringCoefficient.java    | 136 ------
 .../flink/graph/examples/TriangleListing.java   | 182 +++++---
 .../org/apache/flink/graph/scala/Graph.scala    |   4 +-
 .../main/java/org/apache/flink/graph/Graph.java |   3 +-
 .../org/apache/flink/graph/GraphAnalytic.java   |   2 +-
 .../degree/annotate/directed/VertexDegrees.java |   2 +-
 .../directed/GlobalClusteringCoefficient.java   | 177 ++++++++
 .../directed/LocalClusteringCoefficient.java    | 271 ++++++++++++
 .../clustering/directed/TriangleCount.java      |  81 ++++
 .../clustering/directed/TriangleListing.java    | 440 +++++++++++++++++++
 .../undirected/GlobalClusteringCoefficient.java |  25 +-
 .../undirected/LocalClusteringCoefficient.java  |  16 +-
 .../clustering/undirected/TriangleCount.java    |   4 +-
 .../clustering/undirected/TriangleListing.java  |   5 +-
 .../library/metric/directed/VertexMetrics.java  | 238 ++++++++++
 .../metric/undirected/VertexMetrics.java        |   4 +-
 .../apache/flink/graph/utils/Murmur3_32.java    |   4 +-
 .../org/apache/flink/graph/asm/AsmTestBase.java |   6 +-
 .../annotate/directed/EdgeDegreesPairTest.java  |  18 +-
 .../directed/EdgeSourceDegreesTest.java         |  14 +-
 .../directed/EdgeTargetDegreesTest.java         |  16 +-
 .../annotate/directed/VertexDegreesTest.java    |  12 +-
 .../annotate/directed/VertexInDegreeTest.java   |   6 +-
 .../annotate/directed/VertexOutDegreeTest.java  |   6 +-
 .../GlobalClusteringCoefficientTest.java        |  86 ++++
 .../LocalClusteringCoefficientTest.java         |  85 ++++
 .../clustering/directed/TriangleCountTest.java  |  77 ++++
 .../directed/TriangleListingTest.java           |  84 ++++
 .../LocalClusteringCoefficientTest.java         |   8 +-
 .../metric/directed/VertexMetricsTest.java      |  99 +++++
 .../library/similarity/JaccardIndexTest.java    |   2 +-
 35 files changed, 2107 insertions(+), 262 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/d34bdaf7/docs/apis/batch/libs/gelly.md
----------------------------------------------------------------------
diff --git a/docs/apis/batch/libs/gelly.md b/docs/apis/batch/libs/gelly.md
index 45fdbe5..166d2d2 100644
--- a/docs/apis/batch/libs/gelly.md
+++ b/docs/apis/batch/libs/gelly.md
@@ -2112,8 +2112,9 @@ divided by the number of potential edges between 
neighbors.
 See the [Triangle Enumeration](#triangle-enumeration) library method for a 
detailed explanation of triangle enumeration.
 
 #### Usage
-The algorithm takes a simple, undirected graph as input and outputs a 
`DataSet` of tuples containing the vertex ID,
-vertex degree, and number of triangles containing the vertex. The graph ID 
type must be `Comparable` and `Copyable`.
+Directed and undirected variants are provided. The algorithms take a simple 
graph as input and output a `DataSet` of
+tuples containing the vertex ID, vertex degree, and number of triangles 
containing the vertex. The graph ID type must be
+`Comparable` and `Copyable`.
 
 ### Global Clustering Coefficient
 
@@ -2126,8 +2127,9 @@ See the [Local Clustering 
Coefficient](#local-clustering-coefficient) library me
 clustering coefficient.
 
 #### Usage
-The algorithm takes a simple, undirected graph as input and outputs a result 
containing the total number of triplets and
-triangles in the graph. The graph ID type must be `Comparable` and `Copyable`.
+Directed and undirected variants are provided. The algorithm takes a simple 
graph as input and outputs a result
+containing the total number of triplets and triangles in the graph. The graph 
ID type must be `Comparable` and
+`Copyable`.
 
 
 {% top %}

http://git-wip-us.apache.org/repos/asf/flink/blob/d34bdaf7/flink-java/src/main/java/org/apache/flink/api/java/Utils.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/Utils.java 
b/flink-java/src/main/java/org/apache/flink/api/java/Utils.java
index 89b6d17..36ccb23 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/Utils.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/Utils.java
@@ -204,7 +204,7 @@ public final class Utils {
 
                @Override
                public int hashCode() {
-                       return (int) (this.count + this.hashCode());
+                       return (int) (this.count + this.checksum);
                }
 
                @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/d34bdaf7/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/ClusteringCoefficient.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/ClusteringCoefficient.java
 
b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/ClusteringCoefficient.java
new file mode 100644
index 0000000..547ef97
--- /dev/null
+++ 
b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/ClusteringCoefficient.java
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.examples;
+
+import org.apache.commons.lang3.StringEscapeUtils;
+import org.apache.commons.lang3.text.WordUtils;
+import org.apache.commons.math3.random.JDKRandomGenerator;
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.CsvOutputFormat;
+import org.apache.flink.api.java.utils.DataSetUtils;
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.GraphAnalytic;
+import org.apache.flink.graph.asm.translate.LongValueToIntValue;
+import org.apache.flink.graph.asm.translate.TranslateGraphIds;
+import org.apache.flink.graph.generator.RMatGraph;
+import org.apache.flink.graph.generator.random.JDKRandomGeneratorFactory;
+import org.apache.flink.graph.generator.random.RandomGenerableFactory;
+import org.apache.flink.types.IntValue;
+import org.apache.flink.types.LongValue;
+import org.apache.flink.types.NullValue;
+
+import java.text.NumberFormat;
+
+/**
+ * Driver for the library implementations of Global and Local Clustering 
Coefficient.
+ *
+ * This example reads a simple directed or undirected graph from a CSV file or
+ * generates an RMat graph with the given scale and edge factor then calculates
+ * the local clustering coefficient for each vertex and the global clustering
+ * coefficient for the graph.
+ *
+ * @see 
org.apache.flink.graph.library.clustering.directed.GlobalClusteringCoefficient
+ * @see 
org.apache.flink.graph.library.clustering.directed.LocalClusteringCoefficient
+ * @see 
org.apache.flink.graph.library.clustering.undirected.GlobalClusteringCoefficient
+ * @see 
org.apache.flink.graph.library.clustering.undirected.LocalClusteringCoefficient
+ */
+public class ClusteringCoefficient {
+
+       public static final int DEFAULT_SCALE = 10;
+
+       public static final int DEFAULT_EDGE_FACTOR = 16;
+
+       public static final boolean DEFAULT_CLIP_AND_FLIP = true;
+
+       private static void printUsage() {
+               System.out.println(WordUtils.wrap("The local clustering 
coefficient measures the connectedness of each" +
+                       " vertex's neighborhood and the global clustering 
coefficient measures the connectedness of the graph." +
+                       " Scores range from 0.0 (no edges between neighbors or 
vertices) to 1.0 (neighborhood or graph" +
+                       " is a clique).", 80));
+               System.out.println();
+               System.out.println(WordUtils.wrap("This algorithm returns 
tuples containing the vertex ID, the degree of" +
+                       " the vertex, and the number of edges between vertex 
neighbors.", 80));
+               System.out.println();
+               System.out.println("usage: ClusteringCoefficient --directed 
<true | false> --input <csv | rmat [options]> --output <print | hash | csv 
[options]");
+               System.out.println();
+               System.out.println("options:");
+               System.out.println("  --input csv --input_filename FILENAME 
[--input_line_delimiter LINE_DELIMITER] [--input_field_delimiter 
FIELD_DELIMITER]");
+               System.out.println("  --input rmat [--scale SCALE] 
[--edge_factor EDGE_FACTOR]");
+               System.out.println();
+               System.out.println("  --output print");
+               System.out.println("  --output hash");
+               System.out.println("  --output csv --output_filename FILENAME 
[--output_line_delimiter LINE_DELIMITER] [--output_field_delimiter 
FIELD_DELIMITER]");
+       }
+
+       public static void main(String[] args) throws Exception {
+               // Set up the execution environment
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               env.getConfig().enableObjectReuse();
+
+               ParameterTool parameters = ParameterTool.fromArgs(args);
+               if (! parameters.has("directed")) {
+                       printUsage();
+                       return;
+               }
+               boolean directedAlgorithm = parameters.getBoolean("directed");
+
+               // global and local clustering coefficient results
+               GraphAnalytic gcc;
+               DataSet lcc;
+
+               switch (parameters.get("input", "")) {
+                       case "csv": {
+                               String lineDelimiter = 
StringEscapeUtils.unescapeJava(
+                                       parameters.get("input_line_delimiter", 
CsvOutputFormat.DEFAULT_LINE_DELIMITER));
+
+                               String fieldDelimiter = 
StringEscapeUtils.unescapeJava(
+                                       parameters.get("input_field_delimiter", 
CsvOutputFormat.DEFAULT_FIELD_DELIMITER));
+
+                               Graph<LongValue, NullValue, NullValue> graph = 
Graph
+                                       
.fromCsvReader(parameters.get("input_filename"), env)
+                                               .ignoreCommentsEdges("#")
+                                               
.lineDelimiterEdges(lineDelimiter)
+                                               
.fieldDelimiterEdges(fieldDelimiter)
+                                               .keyType(LongValue.class);
+
+                               if (directedAlgorithm) {
+                                       gcc = graph
+                                               .run(new 
org.apache.flink.graph.library.clustering.directed.GlobalClusteringCoefficient<LongValue,
 NullValue, NullValue>());
+                                       lcc = graph
+                                               .run(new 
org.apache.flink.graph.library.clustering.directed.LocalClusteringCoefficient<LongValue,
 NullValue, NullValue>());
+                               } else {
+                                       gcc = graph
+                                               .run(new 
org.apache.flink.graph.library.clustering.undirected.GlobalClusteringCoefficient<LongValue,
 NullValue, NullValue>());
+                                       lcc = graph
+                                               .run(new 
org.apache.flink.graph.library.clustering.undirected.LocalClusteringCoefficient<LongValue,
 NullValue, NullValue>());
+                               }
+                       } break;
+
+                       case "rmat": {
+                               int scale = parameters.getInt("scale", 
DEFAULT_SCALE);
+                               int edgeFactor = 
parameters.getInt("edge_factor", DEFAULT_EDGE_FACTOR);
+
+                               RandomGenerableFactory<JDKRandomGenerator> rnd 
= new JDKRandomGeneratorFactory();
+
+                               long vertexCount = 1L << scale;
+                               long edgeCount = vertexCount * edgeFactor;
+
+                               Graph<LongValue, NullValue, NullValue> graph = 
new RMatGraph<>(env, rnd, vertexCount, edgeCount)
+                                       .generate();
+
+                               if (directedAlgorithm) {
+                                       if (scale > 32) {
+                                               Graph<LongValue, NullValue, 
NullValue> newGraph = graph
+                                                       .run(new 
org.apache.flink.graph.asm.simple.directed.Simplify<LongValue, NullValue, 
NullValue>());
+
+                                               gcc = newGraph
+                                                       .run(new 
org.apache.flink.graph.library.clustering.directed.GlobalClusteringCoefficient<LongValue,
 NullValue, NullValue>());
+                                               lcc = newGraph
+                                                       .run(new 
org.apache.flink.graph.library.clustering.directed.LocalClusteringCoefficient<LongValue,
 NullValue, NullValue>());
+                                       } else {
+                                               Graph<IntValue, NullValue, 
NullValue> newGraph = graph
+                                                       .run(new 
TranslateGraphIds<LongValue, IntValue, NullValue, NullValue>(new 
LongValueToIntValue()))
+                                                       .run(new 
org.apache.flink.graph.asm.simple.directed.Simplify<IntValue, NullValue, 
NullValue>());
+
+                                               gcc = newGraph
+                                                       .run(new 
org.apache.flink.graph.library.clustering.directed.GlobalClusteringCoefficient<IntValue,
 NullValue, NullValue>());
+                                               lcc = newGraph
+                                                       .run(new 
org.apache.flink.graph.library.clustering.directed.LocalClusteringCoefficient<IntValue,
 NullValue, NullValue>());
+                                       }
+                               } else {
+                                       boolean clipAndFlip = 
parameters.getBoolean("clip_and_flip", DEFAULT_CLIP_AND_FLIP);
+
+                                       if (scale > 32) {
+                                               Graph<LongValue, NullValue, 
NullValue> newGraph = graph
+                                                       .run(new 
org.apache.flink.graph.asm.simple.undirected.Simplify<LongValue, NullValue, 
NullValue>(clipAndFlip));
+
+                                               gcc = newGraph
+                                                       .run(new 
org.apache.flink.graph.library.clustering.undirected.GlobalClusteringCoefficient<LongValue,
 NullValue, NullValue>());
+                                               lcc = newGraph
+                                                       .run(new 
org.apache.flink.graph.library.clustering.undirected.LocalClusteringCoefficient<LongValue,
 NullValue, NullValue>());
+                                       } else {
+                                               Graph<IntValue, NullValue, 
NullValue> newGraph = graph
+                                                       .run(new 
TranslateGraphIds<LongValue, IntValue, NullValue, NullValue>(new 
LongValueToIntValue()))
+                                                       .run(new 
org.apache.flink.graph.asm.simple.undirected.Simplify<IntValue, NullValue, 
NullValue>(clipAndFlip));
+
+                                               gcc = newGraph
+                                                       .run(new 
org.apache.flink.graph.library.clustering.undirected.GlobalClusteringCoefficient<IntValue,
 NullValue, NullValue>());
+                                               lcc = newGraph
+                                                       .run(new 
org.apache.flink.graph.library.clustering.undirected.LocalClusteringCoefficient<IntValue,
 NullValue, NullValue>());
+                                       }
+                               }
+                       } break;
+
+                       default:
+                               printUsage();
+                               return;
+               }
+
+               switch (parameters.get("output", "")) {
+                       case "print":
+                               if (directedAlgorithm) {
+                                       for (Object e: lcc.collect()) {
+                                               
org.apache.flink.graph.library.clustering.directed.LocalClusteringCoefficient.Result
 result =
+                                                       
(org.apache.flink.graph.library.clustering.directed.LocalClusteringCoefficient.Result)e;
+                                               
System.out.println(result.toVerboseString());
+                                       }
+                               } else {
+                                       for (Object e: lcc.collect()) {
+                                               
org.apache.flink.graph.library.clustering.undirected.LocalClusteringCoefficient.Result
 result =
+                                                       
(org.apache.flink.graph.library.clustering.undirected.LocalClusteringCoefficient.Result)e;
+                                               
System.out.println(result.toVerboseString());
+                                       }
+                               }
+                               System.out.println(gcc.getResult());
+                               break;
+
+                       case "hash":
+                               
System.out.println(DataSetUtils.checksumHashCode(lcc));
+                               System.out.println(gcc.getResult());
+                               break;
+
+                       case "csv":
+                               String filename = 
parameters.get("output_filename");
+
+                               String lineDelimiter = 
StringEscapeUtils.unescapeJava(
+                                       parameters.get("output_line_delimiter", 
CsvOutputFormat.DEFAULT_LINE_DELIMITER));
+
+                               String fieldDelimiter = 
StringEscapeUtils.unescapeJava(
+                                       
parameters.get("output_field_delimiter", 
CsvOutputFormat.DEFAULT_FIELD_DELIMITER));
+
+                               lcc.writeAsCsv(filename, lineDelimiter, 
fieldDelimiter);
+
+                               System.out.println(gcc.execute());
+                               break;
+
+                       default:
+                               printUsage();
+                               return;
+               }
+
+               JobExecutionResult result = env.getLastJobExecutionResult();
+
+               NumberFormat nf = NumberFormat.getInstance();
+               System.out.println("Execution runtime: " + 
nf.format(result.getNetRuntime()) + " ms");
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d34bdaf7/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/JaccardIndex.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/JaccardIndex.java
 
b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/JaccardIndex.java
index c078d73..46a296a 100644
--- 
a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/JaccardIndex.java
+++ 
b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/JaccardIndex.java
@@ -97,10 +97,10 @@ public class JaccardIndex {
 
                                Graph<LongValue, NullValue, NullValue> graph = 
Graph
                                        
.fromCsvReader(parameters.get("input_filename"), env)
-                                       .ignoreCommentsEdges("#")
-                                       .lineDelimiterEdges(lineDelimiter)
-                                       .fieldDelimiterEdges(fieldDelimiter)
-                                       .keyType(LongValue.class);
+                                               .ignoreCommentsEdges("#")
+                                               
.lineDelimiterEdges(lineDelimiter)
+                                               
.fieldDelimiterEdges(fieldDelimiter)
+                                               .keyType(LongValue.class);
 
                                ji = graph
                                        .run(new 
org.apache.flink.graph.library.similarity.JaccardIndex<LongValue, NullValue, 
NullValue>());
@@ -162,6 +162,7 @@ public class JaccardIndex {
 
                                env.execute();
                                break;
+
                        default:
                                printUsage();
                                return;

http://git-wip-us.apache.org/repos/asf/flink/blob/d34bdaf7/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/LocalClusteringCoefficient.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/LocalClusteringCoefficient.java
 
b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/LocalClusteringCoefficient.java
deleted file mode 100644
index bed68b2..0000000
--- 
a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/LocalClusteringCoefficient.java
+++ /dev/null
@@ -1,136 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.examples;
-
-import org.apache.commons.lang3.text.WordUtils;
-import org.apache.commons.math3.random.JDKRandomGenerator;
-import org.apache.flink.api.common.JobExecutionResult;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.io.CsvOutputFormat;
-import org.apache.flink.api.java.utils.DataSetUtils;
-import org.apache.flink.api.java.utils.ParameterTool;
-import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.asm.simple.undirected.Simplify;
-import org.apache.flink.graph.asm.translate.LongValueToIntValue;
-import org.apache.flink.graph.asm.translate.TranslateGraphIds;
-import org.apache.flink.graph.generator.RMatGraph;
-import org.apache.flink.graph.generator.random.JDKRandomGeneratorFactory;
-import org.apache.flink.graph.generator.random.RandomGenerableFactory;
-import 
org.apache.flink.graph.library.clustering.undirected.LocalClusteringCoefficient.Result;
-import org.apache.flink.types.IntValue;
-import org.apache.flink.types.LongValue;
-import org.apache.flink.types.NullValue;
-
-import java.text.NumberFormat;
-
-/**
- * Driver for the library implementation of Local Clustering Coefficient.
- *
- * This example generates an undirected RMat graph with the given scale and
- * edge factor then calculates the local clustering coefficient for each 
vertex.
- *
- * @see 
org.apache.flink.graph.library.clustering.undirected.LocalClusteringCoefficient
- */
-public class LocalClusteringCoefficient {
-
-       public static final int DEFAULT_SCALE = 10;
-
-       public static final int DEFAULT_EDGE_FACTOR = 16;
-
-       public static final boolean DEFAULT_CLIP_AND_FLIP = true;
-
-       public static void main(String[] args) throws Exception {
-               // Set up the execution environment
-               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-               env.getConfig().enableObjectReuse();
-
-               ParameterTool parameters = ParameterTool.fromArgs(args);
-
-               // Generate RMat graph
-               int scale = parameters.getInt("scale", DEFAULT_SCALE);
-               int edgeFactor = parameters.getInt("edge_factor", 
DEFAULT_EDGE_FACTOR);
-
-               RandomGenerableFactory<JDKRandomGenerator> rnd = new 
JDKRandomGeneratorFactory();
-
-               long vertexCount = 1L << scale;
-               long edgeCount = vertexCount * edgeFactor;
-
-               boolean clipAndFlip = parameters.getBoolean("clip_and_flip", 
DEFAULT_CLIP_AND_FLIP);
-
-               Graph<LongValue, NullValue, NullValue> graph = new 
RMatGraph<>(env, rnd, vertexCount, edgeCount)
-                       .generate()
-                       .run(new Simplify<LongValue, NullValue, 
NullValue>(clipAndFlip));
-
-               DataSet cc;
-
-               if (scale > 32) {
-                       cc = graph
-                               .run(new 
org.apache.flink.graph.library.clustering.undirected.LocalClusteringCoefficient<LongValue,
 NullValue, NullValue>());
-               } else {
-                       cc = graph
-                               .run(new TranslateGraphIds<LongValue, IntValue, 
NullValue, NullValue>(new LongValueToIntValue()))
-                               .run(new 
org.apache.flink.graph.library.clustering.undirected.LocalClusteringCoefficient<IntValue,
 NullValue, NullValue>());
-               }
-
-               switch (parameters.get("output", "")) {
-               case "print":
-                       for (Object e: cc.collect()) {
-                               Result result = (Result)e;
-                               System.out.println(result.toVerboseString());
-                       }
-                       break;
-
-               case "hash":
-                       System.out.println(DataSetUtils.checksumHashCode(cc));
-                       break;
-
-               case "csv":
-                       String filename = parameters.get("filename");
-
-                       String row_delimiter = parameters.get("row_delimiter", 
CsvOutputFormat.DEFAULT_LINE_DELIMITER);
-                       String field_delimiter = 
parameters.get("field_delimiter", CsvOutputFormat.DEFAULT_FIELD_DELIMITER);
-
-                       cc.writeAsCsv(filename, row_delimiter, field_delimiter);
-
-                       env.execute();
-                       break;
-               default:
-                       System.out.println(WordUtils.wrap("The local clustering 
coefficient measures the connectedness of each" +
-                                       " vertex's neighborhood. Scores range 
from 0.0 (no edges between neighbors) to 1.0 (neighborhood" +
-                                       " is a clique).", 80));
-                       System.out.println();
-                       System.out.println(WordUtils.wrap("This algorithm 
returns tuples containing the vertex ID, the degree of" +
-                                       " the vertex, the number of edges 
between vertex neighbors, and the local clustering coefficient.", 80));
-                       System.out.println();
-                       System.out.println("usage:");
-                       System.out.println("  LocalClusteringCoefficient 
[--scale SCALE] [--edge_factor EDGE_FACTOR] --output print");
-                       System.out.println("  LocalClusteringCoefficient 
[--scale SCALE] [--edge_factor EDGE_FACTOR] --output hash");
-                       System.out.println("  LocalClusteringCoefficient 
[--scale SCALE] [--edge_factor EDGE_FACTOR] --output csv" +
-                               " --filename FILENAME [--row_delimiter 
ROW_DELIMITER] [--field_delimiter FIELD_DELIMITER]");
-
-                       return;
-               }
-
-               JobExecutionResult result = env.getLastJobExecutionResult();
-
-               NumberFormat nf = NumberFormat.getInstance();
-               System.out.println("Execution runtime: " + 
nf.format(result.getNetRuntime()) + " ms");
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/d34bdaf7/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/TriangleListing.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/TriangleListing.java
 
b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/TriangleListing.java
index a20bf20..9bd41f0 100644
--- 
a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/TriangleListing.java
+++ 
b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/TriangleListing.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.graph.examples;
 
+import org.apache.commons.lang3.StringEscapeUtils;
+import org.apache.commons.lang3.text.WordUtils;
 import org.apache.commons.math3.random.JDKRandomGenerator;
 import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.java.DataSet;
@@ -41,9 +43,11 @@ import java.text.NumberFormat;
 /**
  * Driver for the library implementation of Triangle Listing.
  *
- * This example generates an undirected RMat graph with the given scale
- * and edge factor then lists all triangles.
+ * This example reads a simple directed or undirected graph from a CSV file or
+ * generates an RMat graph with the given scale and edge factor then lists
+ * all triangles.
  *
+ * @see org.apache.flink.graph.library.clustering.directed.TriangleListing
  * @see org.apache.flink.graph.library.clustering.undirected.TriangleListing
  */
 public class TriangleListing {
@@ -54,68 +58,142 @@ public class TriangleListing {
 
        public static final boolean DEFAULT_CLIP_AND_FLIP = true;
 
+       private static void printUsage() {
+               System.out.println(WordUtils.wrap("Lists all triangles in a 
graph.", 80));
+               System.out.println();
+               System.out.println(WordUtils.wrap("This algorithm returns 
tuples containing the vertex IDs for each triangle and" +
+                       " for directed graphs a bitmask indicating the presence 
of the six potential connecting edges.", 80));
+               System.out.println();
+               System.out.println("usage: TriangleListing --directed <true | 
false> --input <csv | rmat [options]> --output <print | hash | csv [options]");
+               System.out.println();
+               System.out.println("options:");
+               System.out.println("  --input csv --input_filename FILENAME 
[--input_line_delimiter LINE_DELIMITER] [--input_field_delimiter 
FIELD_DELIMITER]");
+               System.out.println("  --input rmat [--scale SCALE] 
[--edge_factor EDGE_FACTOR]");
+               System.out.println();
+               System.out.println("  --output print");
+               System.out.println("  --output hash");
+               System.out.println("  --output csv --output_filename FILENAME 
[--output_line_delimiter LINE_DELIMITER] [--output_field_delimiter 
FIELD_DELIMITER]");
+       }
+
        public static void main(String[] args) throws Exception {
                // Set up the execution environment
                final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
                env.getConfig().enableObjectReuse();
 
                ParameterTool parameters = ParameterTool.fromArgs(args);
-
-               // Generate RMat graph
-               int scale = parameters.getInt("scale", DEFAULT_SCALE);
-               int edgeFactor = parameters.getInt("edge_factor", 
DEFAULT_EDGE_FACTOR);
-
-               RandomGenerableFactory<JDKRandomGenerator> rnd = new 
JDKRandomGeneratorFactory();
-
-               long vertexCount = 1L << scale;
-               long edgeCount = vertexCount * edgeFactor;
-
-               boolean clipAndFlip = parameters.getBoolean("clip_and_flip", 
DEFAULT_CLIP_AND_FLIP);
-
-               Graph<LongValue, NullValue, NullValue> graph = new 
RMatGraph<>(env, rnd, vertexCount, edgeCount)
-                       .generate()
-                       .run(new Simplify<LongValue, NullValue, 
NullValue>(clipAndFlip));
+               if (! parameters.has("directed")) {
+                       printUsage();
+                       return;
+               }
+               boolean directedAlgorithm = parameters.getBoolean("directed");
 
                DataSet tl;
 
-               if (scale > 32) {
-                       tl = graph
-                               .run(new 
org.apache.flink.graph.library.clustering.undirected.TriangleListing<LongValue, 
NullValue, NullValue>());
-               } else {
-                       tl = graph
-                               .run(new TranslateGraphIds<LongValue, IntValue, 
NullValue, NullValue>(new LongValueToIntValue()))
-                               .run(new 
org.apache.flink.graph.library.clustering.undirected.TriangleListing<IntValue, 
NullValue, NullValue>());
+               switch (parameters.get("input", "")) {
+                       case "csv": {
+                               String lineDelimiter = 
StringEscapeUtils.unescapeJava(
+                                       parameters.get("input_line_delimiter", 
CsvOutputFormat.DEFAULT_LINE_DELIMITER));
+
+                               String fieldDelimiter = 
StringEscapeUtils.unescapeJava(
+                                       parameters.get("input_field_delimiter", 
CsvOutputFormat.DEFAULT_FIELD_DELIMITER));
+
+                               Graph<LongValue, NullValue, NullValue> graph = 
Graph
+                                       
.fromCsvReader(parameters.get("input_filename"), env)
+                                               .ignoreCommentsEdges("#")
+                                               
.lineDelimiterEdges(lineDelimiter)
+                                               
.fieldDelimiterEdges(fieldDelimiter)
+                                               .keyType(LongValue.class);
+
+                               if (directedAlgorithm) {
+                                       tl = graph
+                                               .run(new 
org.apache.flink.graph.library.clustering.directed.TriangleListing<LongValue, 
NullValue, NullValue>());
+                               } else {
+                                       tl = graph
+                                               .run(new 
org.apache.flink.graph.library.clustering.undirected.TriangleListing<LongValue, 
NullValue, NullValue>());
+                               }
+
+                       } break;
+
+                       case "rmat": {
+                               int scale = parameters.getInt("scale", 
DEFAULT_SCALE);
+                               int edgeFactor = 
parameters.getInt("edge_factor", DEFAULT_EDGE_FACTOR);
+
+                               RandomGenerableFactory<JDKRandomGenerator> rnd 
= new JDKRandomGeneratorFactory();
+
+                               long vertexCount = 1L << scale;
+                               long edgeCount = vertexCount * edgeFactor;
+
+                               Graph<LongValue, NullValue, NullValue> graph = 
new RMatGraph<>(env, rnd, vertexCount, edgeCount)
+                                       .generate();
+
+                               if (directedAlgorithm) {
+                                       if (scale > 32) {
+                                               tl = graph
+                                                       .run(new 
org.apache.flink.graph.asm.simple.directed.Simplify<LongValue, NullValue, 
NullValue>())
+                                                       .run(new 
org.apache.flink.graph.library.clustering.directed.TriangleListing<LongValue, 
NullValue, NullValue>());
+                                       } else {
+                                               tl = graph
+                                                       .run(new 
TranslateGraphIds<LongValue, IntValue, NullValue, NullValue>(new 
LongValueToIntValue()))
+                                                       .run(new 
org.apache.flink.graph.asm.simple.directed.Simplify<IntValue, NullValue, 
NullValue>())
+                                                       .run(new 
org.apache.flink.graph.library.clustering.directed.TriangleListing<IntValue, 
NullValue, NullValue>());
+                                       }
+                               } else {
+                                       boolean clipAndFlip = 
parameters.getBoolean("clip_and_flip", DEFAULT_CLIP_AND_FLIP);
+
+                                       graph = graph
+                                               .run(new Simplify<LongValue, 
NullValue, NullValue>(clipAndFlip));
+
+                                       if (scale > 32) {
+                                               tl = graph
+                                                       .run(new 
org.apache.flink.graph.asm.simple.undirected.Simplify<LongValue, NullValue, 
NullValue>(clipAndFlip))
+                                                       .run(new 
org.apache.flink.graph.library.clustering.undirected.TriangleListing<LongValue, 
NullValue, NullValue>());
+                                       } else {
+                                               tl = graph
+                                                       .run(new 
TranslateGraphIds<LongValue, IntValue, NullValue, NullValue>(new 
LongValueToIntValue()))
+                                                       .run(new 
org.apache.flink.graph.asm.simple.undirected.Simplify<IntValue, NullValue, 
NullValue>(clipAndFlip))
+                                                       .run(new 
org.apache.flink.graph.library.clustering.undirected.TriangleListing<IntValue, 
NullValue, NullValue>());
+                                       }
+                               }
+                       } break;
+
+                       default:
+                               printUsage();
+                               return;
                }
 
                switch (parameters.get("output", "")) {
-               case "print":
-                       tl.print();
-                       break;
-
-               case "hash":
-                       System.out.println(DataSetUtils.checksumHashCode(tl));
-                       break;
-
-               case "csv":
-                       String filename = parameters.get("filename");
-
-                       String row_delimiter = parameters.get("row_delimiter", 
CsvOutputFormat.DEFAULT_LINE_DELIMITER);
-                       String field_delimiter = 
parameters.get("field_delimiter", CsvOutputFormat.DEFAULT_FIELD_DELIMITER);
-
-                       tl.writeAsCsv(filename, row_delimiter, field_delimiter);
-
-                       env.execute();
-                       break;
-               default:
-                       System.out.println("Lists all distinct triangles in the 
generated RMat graph.");
-                       System.out.println();
-                       System.out.println("usage:");
-                       System.out.println("  TriangleListing [--scale SCALE] 
[--edge_factor EDGE_FACTOR] --output print");
-                       System.out.println("  TriangleListing [--scale SCALE] 
[--edge_factor EDGE_FACTOR] --output hash");
-                       System.out.println("  TriangleListing [--scale SCALE] 
[--edge_factor EDGE_FACTOR] --output csv" +
-                               " --filename FILENAME [--row_delimiter 
ROW_DELIMITER] [--field_delimiter FIELD_DELIMITER]");
-
-                       return;
+                       case "print":
+                               if (directedAlgorithm) {
+                                       for (Object e: tl.collect()) {
+                                               
org.apache.flink.graph.library.clustering.directed.TriangleListing.Result 
result =
+                                                       
(org.apache.flink.graph.library.clustering.directed.TriangleListing.Result) e;
+                                               
System.out.println(result.toVerboseString());
+                                       }
+                               } else {
+                                       tl.print();
+                               }
+                               break;
+
+                       case "hash":
+                               
System.out.println(DataSetUtils.checksumHashCode(tl));
+                               break;
+
+                       case "csv":
+                               String filename = 
parameters.get("output_filename");
+
+                               String lineDelimiter = 
StringEscapeUtils.unescapeJava(
+                                       parameters.get("output_line_delimiter", 
CsvOutputFormat.DEFAULT_LINE_DELIMITER));
+
+                               String fieldDelimiter = 
StringEscapeUtils.unescapeJava(
+                                       
parameters.get("output_field_delimiter", 
CsvOutputFormat.DEFAULT_FIELD_DELIMITER));
+
+                               tl.writeAsCsv(filename, lineDelimiter, 
fieldDelimiter);
+
+                               env.execute();
+                               break;
+                       default:
+                               printUsage();
+                               return;
                }
 
                JobExecutionResult result = env.getLastJobExecutionResult();

http://git-wip-us.apache.org/repos/asf/flink/blob/d34bdaf7/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/Graph.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/Graph.scala
 
b/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/Graph.scala
index 3881aae..165f6c2 100644
--- 
a/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/Graph.scala
+++ 
b/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/Graph.scala
@@ -1127,8 +1127,10 @@ TypeInformation : ClassTag](jgraph: jg.Graph[K, VV, EV]) 
{
    *
    * @param analytic the analytic to run on the Graph
    */
-  def run[T: TypeInformation : ClassTag](analytic: GraphAnalytic[K, VV, EV, 
T])= {
+  def run[T: TypeInformation : ClassTag](analytic: GraphAnalytic[K, VV, EV, 
T]):
+  GraphAnalytic[K, VV, EV, T] = {
     jgraph.run(analytic)
+    analytic
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/flink/blob/d34bdaf7/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
index dd25cfd..3dbb9c4 100755
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
@@ -1797,8 +1797,9 @@ public class Graph<K, VV, EV> {
         * @param <T> the result type
         * @throws Exception
         */
-       public <T> void run(GraphAnalytic<K, VV, EV, T> analytic) throws 
Exception {
+       public <T> GraphAnalytic<K, VV, EV, T> run(GraphAnalytic<K, VV, EV, T> 
analytic) throws Exception {
                analytic.run(this);
+               return analytic;
        }
 
        /**

http://git-wip-us.apache.org/repos/asf/flink/blob/d34bdaf7/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/GraphAnalytic.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/GraphAnalytic.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/GraphAnalytic.java
index dd221dc..0bdc792 100644
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/GraphAnalytic.java
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/GraphAnalytic.java
@@ -38,7 +38,7 @@ public interface GraphAnalytic<K, VV, EV, T> {
         * This method must be called after the program has executed:
         *  1) "run" analytics and algorithms
         *  2) call ExecutionEnvironment.execute()
-        *  3) get analytics results
+        *  3) get analytic results
         *
         * @return the result
         */

http://git-wip-us.apache.org/repos/asf/flink/blob/d34bdaf7/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexDegrees.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexDegrees.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexDegrees.java
index 71a0859..1f1d4ab 100644
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexDegrees.java
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexDegrees.java
@@ -142,7 +142,7 @@ implements GraphAlgorithm<K, VV, EV, DataSet<Vertex<K, 
Degrees>>> {
        }
 
        /**
-        * Combine mutual edges.
+        * Reduce bitmasks to a single value using bitwise-or.
         *
         * @param <T> ID type
         */

http://git-wip-us.apache.org/repos/asf/flink/blob/d34bdaf7/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/GlobalClusteringCoefficient.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/GlobalClusteringCoefficient.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/GlobalClusteringCoefficient.java
new file mode 100644
index 0000000..3b5452b
--- /dev/null
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/GlobalClusteringCoefficient.java
@@ -0,0 +1,177 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one
+ *  * or more contributor license agreements.  See the NOTICE file
+ *  * distributed with this work for additional information
+ *  * regarding copyright ownership.  The ASF licenses this file
+ *  * to you under the Apache License, Version 2.0 (the
+ *  * "License"); you may not use this file except in compliance
+ *  * with the License.  You may obtain a copy of the License at
+ *  *
+ *  *     http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+
+package org.apache.flink.graph.library.clustering.directed;
+
+import org.apache.commons.lang3.builder.EqualsBuilder;
+import org.apache.commons.lang3.builder.HashCodeBuilder;
+import org.apache.flink.graph.AbstractGraphAnalytic;
+import org.apache.flink.graph.Graph;
+import 
org.apache.flink.graph.library.clustering.directed.GlobalClusteringCoefficient.Result;
+import org.apache.flink.graph.library.metric.directed.VertexMetrics;
+import org.apache.flink.types.CopyableValue;
+
+import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
+
+/**
+ * The global clustering coefficient measures the connectedness of a graph.
+ * Scores range from 0.0 (no triangles) to 1.0 (complete graph).
+ *
+ * @param <K> graph ID type
+ * @param <VV> vertex value type
+ * @param <EV> edge value type
+ */
+public class GlobalClusteringCoefficient<K extends Comparable<K> & 
CopyableValue<K>, VV, EV>
+extends AbstractGraphAnalytic<K, VV, EV, Result> {
+
+       private TriangleCount<K, VV, EV> triangleCount;
+
+       private VertexMetrics<K, VV, EV> vertexMetrics;
+
+       // Optional configuration
+       private int littleParallelism = PARALLELISM_DEFAULT;
+
+       /**
+        * Override the parallelism of operators processing small amounts of 
data.
+        *
+        * @param littleParallelism operator parallelism
+        * @return this
+        */
+       public GlobalClusteringCoefficient<K, VV, EV> setLittleParallelism(int 
littleParallelism) {
+               this.littleParallelism = littleParallelism;
+
+               return this;
+       }
+
+       /*
+        * Implementation notes:
+        *
+        * The requirement that "K extends CopyableValue<K>" can be removed when
+        *   removed from TriangleListing.
+        */
+
+       @Override
+       public GlobalClusteringCoefficient<K, VV, EV> run(Graph<K, VV, EV> 
input)
+                       throws Exception {
+               super.run(input);
+
+               triangleCount = new TriangleCount<K, VV, EV>()
+                       .setLittleParallelism(littleParallelism);
+
+               input.run(triangleCount);
+
+               vertexMetrics = new VertexMetrics<K, VV, EV>()
+                       .setParallelism(littleParallelism);
+
+               input.run(vertexMetrics);
+
+               return this;
+       }
+
+       @Override
+       public Result getResult() {
+               // each triangle must be counted from each of the three vertices
+               // as each triplet is counted in this manner
+               long numberOfTriangles = 3 * triangleCount.getResult();
+
+               return new 
Result(vertexMetrics.getResult().getNumberOfTriplets(), numberOfTriangles);
+       }
+
+       /**
+        * Wraps global clustering coefficient metrics.
+        */
+       public static class Result {
+               private long tripletCount;
+
+               private long triangleCount;
+
+               /**
+                * Instantiate an immutable result.
+                *
+                * @param tripletCount triplet count
+                * @param triangleCount triangle count
+                */
+               public Result(long tripletCount, long triangleCount) {
+                       this.tripletCount = tripletCount;
+                       this.triangleCount = triangleCount;
+               }
+
+               /**
+                * Get the number of triplets.
+                *
+                * @return number of triplets
+                */
+               public long getNumberOfTriplets() {
+                       return tripletCount;
+               }
+
+               /**
+                * Get the number of triangles.
+                *
+                * @return number of triangles
+                */
+               public long getNumberOfTriangles() {
+                       return triangleCount;
+               }
+
+               /**
+                * Get the global clustering coefficient score. This is 
computed as the
+                * number of closed triplets (triangles) divided by the total 
number of
+                * triplets.
+                *
+                * A score of {@code Double.NaN} is returned for a graph of 
isolated vertices
+                * for which both the triangle count and number of neighbors 
are zero.
+                *
+                * @return global clustering coefficient score
+                */
+               public double getGlobalClusteringCoefficientScore() {
+                       return (tripletCount == 0) ? Double.NaN : triangleCount 
/ (double)tripletCount;
+               }
+
+               @Override
+               public String toString() {
+                       return "triplet count: " + tripletCount
+                               + ", triangle count: " + triangleCount
+                               + ", global clustering coefficient: " + 
getGlobalClusteringCoefficientScore();
+               }
+
+               @Override
+               public int hashCode() {
+                       return new HashCodeBuilder()
+                               .append(tripletCount)
+                               .append(triangleCount)
+                               .hashCode();
+               }
+
+               @Override
+               public boolean equals(Object obj) {
+                       if (obj == null) { return false; }
+                       if (obj == this) { return true; }
+                       if (obj.getClass() != getClass()) { return false; }
+
+                       Result rhs = (Result)obj;
+
+                       return new EqualsBuilder()
+                               .append(tripletCount, rhs.tripletCount)
+                               .append(triangleCount, rhs.triangleCount)
+                               .isEquals();
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d34bdaf7/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/LocalClusteringCoefficient.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/LocalClusteringCoefficient.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/LocalClusteringCoefficient.java
new file mode 100644
index 0000000..1314771
--- /dev/null
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/LocalClusteringCoefficient.java
@@ -0,0 +1,271 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.library.clustering.directed;
+
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.functions.JoinFunction;
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.functions.FunctionAnnotation;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.GraphAlgorithm;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.asm.degree.annotate.directed.VertexDegrees;
+import 
org.apache.flink.graph.asm.degree.annotate.directed.VertexDegrees.Degrees;
+import 
org.apache.flink.graph.library.clustering.directed.LocalClusteringCoefficient.Result;
+import org.apache.flink.graph.utils.Murmur3_32;
+import org.apache.flink.types.CopyableValue;
+import org.apache.flink.types.LongValue;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.Preconditions;
+
+import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
+
+/**
+ * The local clustering coefficient measures the connectedness of each vertex's
+ * neighborhood. Scores range from 0.0 (no edges between neighbors) to 1.0
+ * (neighborhood is a clique).
+ * <br/>
+ * An edge between a vertex's neighbors is a triangle. Counting edges between
+ * neighbors is equivalent to counting the number of triangles which include
+ * the vertex.
+ * <br/>
+ * The input graph must be a simple graph containing no duplicate edges or
+ * self-loops.
+ *
+ * @param <K> graph ID type
+ * @param <VV> vertex value type
+ * @param <EV> edge value type
+ */
+public class LocalClusteringCoefficient<K extends Comparable<K> & 
CopyableValue<K>, VV, EV>
+implements GraphAlgorithm<K, VV, EV, DataSet<Result<K>>> {
+
+       // Optional configuration
+       private int littleParallelism = PARALLELISM_DEFAULT;
+
+       /**
+        * Override the parallelism of operators processing small amounts of 
data.
+        *
+        * @param littleParallelism operator parallelism
+        * @return this
+        */
+       public LocalClusteringCoefficient<K, VV, EV> setLittleParallelism(int 
littleParallelism) {
+               Preconditions.checkArgument(littleParallelism > 0 || 
littleParallelism == PARALLELISM_DEFAULT,
+                       "The parallelism must be greater than zero.");
+
+               this.littleParallelism = littleParallelism;
+
+               return this;
+       }
+
+       /*
+        * Implementation notes:
+        *
+        * The requirement that "K extends CopyableValue<K>" can be removed when
+        *   removed from TriangleListing.
+        *
+        * CountVertices can be replaced by ".sum(1)" when Flink aggregators use
+        *   code generation.
+        */
+
+       @Override
+       public DataSet<Result<K>> run(Graph<K, VV, EV> input)
+                       throws Exception {
+               // u, v, w, bitmask
+               DataSet<TriangleListing.Result<K>> triangles = input
+                       .run(new TriangleListing<K,VV,EV>()
+                               .setSortTriangleVertices(false)
+                               .setLittleParallelism(littleParallelism));
+
+               // u, edge count
+               DataSet<Tuple2<K, LongValue>> triangleVertices = triangles
+                       .flatMap(new SplitTriangles<K>())
+                               .name("Split triangle vertices");
+
+               // u, triangle count
+               DataSet<Tuple2<K, LongValue>> vertexTriangleCount = 
triangleVertices
+                       .groupBy(0)
+                       .reduce(new CountTriangles<K>())
+                               .name("Count triangles");
+
+               // u, deg(u)
+               DataSet<Vertex<K, Degrees>> vertexDegree = input
+                       .run(new VertexDegrees<K, VV, EV>()
+                               .setParallelism(littleParallelism)
+                               .setIncludeZeroDegreeVertices(true));
+
+               // u, deg(u), triangle count
+               return vertexDegree
+                       .leftOuterJoin(vertexTriangleCount)
+                       .where(0)
+                       .equalTo(0)
+                       .with(new JoinVertexDegreeWithTriangleCount<K>())
+                               .setParallelism(littleParallelism)
+                               .name("Clustering coefficient");
+       }
+
+       /**
+        * Emits the three vertex IDs comprising each triangle along with an 
initial count.
+        *
+        * @param <T> ID type
+        */
+       private class SplitTriangles<T>
+       implements FlatMapFunction<TriangleListing.Result<T>, Tuple2<T, 
LongValue>> {
+               private LongValue one = new LongValue(1);
+
+               private LongValue two = new LongValue(2);
+
+               private Tuple2<T, LongValue> output = new Tuple2<>();
+
+               @Override
+               public void flatMap(TriangleListing.Result<T> value, 
Collector<Tuple2<T, LongValue>> out)
+                               throws Exception {
+                       byte bitmask = value.f3.getValue();
+
+                       output.f0 = value.f0;
+                       output.f1 = ((bitmask & 0b000011) == 0b000011) ? two : 
one;
+                       out.collect(output);
+
+                       output.f0 = value.f1;
+                       output.f1 = ((bitmask & 0b001100) == 0b001100) ? two : 
one;
+                       out.collect(output);
+
+                       output.f0 = value.f2;
+                       output.f1 = ((bitmask & 0b110000) == 0b110000) ? two : 
one;
+                       out.collect(output);
+               }
+       }
+
+       /**
+        * Sums the triangle count for each vertex ID.
+        *
+        * @param <T> ID type
+        */
+       @FunctionAnnotation.ForwardedFields("0")
+       private class CountTriangles<T>
+       implements ReduceFunction<Tuple2<T, LongValue>> {
+               @Override
+               public Tuple2<T, LongValue> reduce(Tuple2<T, LongValue> left, 
Tuple2<T, LongValue> right)
+                               throws Exception {
+                       left.f1.setValue(left.f1.getValue() + 
right.f1.getValue());
+                       return left;
+               }
+       }
+
+       /**
+        * Joins the vertex and degree with the vertex's triangle count.
+        *
+        * @param <T> ID type
+        */
+       @FunctionAnnotation.ForwardedFieldsFirst("0; 1.0->1.0")
+       @FunctionAnnotation.ForwardedFieldsSecond("0")
+       private class JoinVertexDegreeWithTriangleCount<T>
+       implements JoinFunction<Vertex<T, Degrees>, Tuple2<T, LongValue>, 
Result<T>> {
+               private LongValue zero = new LongValue(0);
+
+               private Result<T> output = new Result<>();
+
+               @Override
+               public Result<T> join(Vertex<T, Degrees> vertexAndDegree, 
Tuple2<T, LongValue> vertexAndTriangleCount)
+                               throws Exception {
+                       output.f0 = vertexAndDegree.f0;
+                       output.f1.f0 = vertexAndDegree.f1.f0;
+                       output.f1.f1 = (vertexAndTriangleCount == null) ? zero 
: vertexAndTriangleCount.f1;
+
+                       return output;
+               }
+       }
+
+       /**
+        * Wraps the vertex type to encapsulate results from the Local 
Clustering Coefficient algorithm.
+        *
+        * @param <T> ID type
+        */
+       public static class Result<T>
+       extends Vertex<T, Tuple2<LongValue, LongValue>> {
+               public static final int HASH_SEED = 0x37a208c4;
+
+               private Murmur3_32 hasher = new Murmur3_32(HASH_SEED);
+
+               /**
+                * No-args constructor.
+                */
+               public Result() {
+                       f1 = new Tuple2<>();
+               }
+
+               /**
+                * Get the vertex degree.
+                *
+                * @return vertex degree
+                */
+               public LongValue getDegree() {
+                       return f1.f0;
+               }
+
+               /**
+                * Get the number of triangles containing this vertex; 
equivalently,
+                * this is the number of edges between neighbors of this vertex.
+                *
+                * @return triangle count
+                */
+               public LongValue getTriangleCount() {
+                       return f1.f1;
+               }
+
+               /**
+                * Get the local clustering coefficient score. This is computed 
as the
+                * number of edges between neighbors, equal to the triangle 
count,
+                * divided by the number of potential edges between neighbors.
+                *
+                * A score of {@code Double.NaN} is returned for a vertex with 
degree 1
+                * for which both the triangle count and number of neighbors 
are zero.
+                *
+                * @return local clustering coefficient score
+                */
+               public double getLocalClusteringCoefficientScore() {
+                       long degree = getDegree().getValue();
+                       long neighborPairs = degree * (degree - 1);
+
+                       return (neighborPairs == 0) ? Double.NaN : 
getTriangleCount().getValue() / (double)neighborPairs;
+               }
+
+               /**
+                * Format values into a human-readable string.
+                *
+                * @return verbose string
+                */
+               public String toVerboseString() {
+                       return "Vertex ID: " + f0
+                               + ", vertex degree: " + getDegree()
+                               + ", triangle count: " + getTriangleCount()
+                               + ", local clustering coefficient: " + 
getLocalClusteringCoefficientScore();
+               }
+
+               @Override
+               public int hashCode() {
+                       return hasher.reset()
+                               .hash(f0.hashCode())
+                               .hash(f1.f0.getValue())
+                               .hash(f1.f1.getValue())
+                               .hash();
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d34bdaf7/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/TriangleCount.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/TriangleCount.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/TriangleCount.java
new file mode 100644
index 0000000..9ba53f1
--- /dev/null
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/TriangleCount.java
@@ -0,0 +1,81 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one
+ *  * or more contributor license agreements.  See the NOTICE file
+ *  * distributed with this work for additional information
+ *  * regarding copyright ownership.  The ASF licenses this file
+ *  * to you under the Apache License, Version 2.0 (the
+ *  * "License"); you may not use this file except in compliance
+ *  * with the License.  You may obtain a copy of the License at
+ *  *
+ *  *     http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+
+package org.apache.flink.graph.library.clustering.directed;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.Utils.CountHelper;
+import org.apache.flink.graph.AbstractGraphAnalytic;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.types.CopyableValue;
+import org.apache.flink.util.AbstractID;
+
+import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
+
+/**
+ * Count the number of distinct triangles in an undirected graph.
+ *
+ * @param <K> graph ID type
+ * @param <VV> vertex value type
+ * @param <EV> edge value type
+ * @see TriangleListing
+ */
+public class TriangleCount<K extends Comparable<K> & CopyableValue<K>, VV, EV>
+extends AbstractGraphAnalytic<K, VV, EV, Long> {
+
+       private String id = new AbstractID().toString();
+
+       // Optional configuration
+       private int littleParallelism = PARALLELISM_DEFAULT;
+
+       /**
+        * Override the parallelism of operators processing small amounts of 
data.
+        *
+        * @param littleParallelism operator parallelism
+        * @return this
+        */
+       public TriangleCount<K, VV, EV> setLittleParallelism(int 
littleParallelism) {
+               this.littleParallelism = littleParallelism;
+
+               return this;
+       }
+
+       @Override
+       public TriangleCount<K, VV, EV> run(Graph<K, VV, EV> input)
+                       throws Exception {
+               super.run(input);
+
+               DataSet<TriangleListing.Result<K>> triangles = input
+                       .run(new TriangleListing<K, VV, EV>()
+                               .setSortTriangleVertices(false)
+                               .setLittleParallelism(littleParallelism));
+
+               triangles
+                       .output(new CountHelper<TriangleListing.Result<K>>(id))
+                               .name("Count triangles");
+
+               return this;
+       }
+
+       @Override
+       public Long getResult() {
+               return env.getLastJobExecutionResult().<Long> 
getAccumulatorResult(id);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d34bdaf7/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/TriangleListing.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/TriangleListing.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/TriangleListing.java
new file mode 100644
index 0000000..5c364f5
--- /dev/null
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/TriangleListing.java
@@ -0,0 +1,440 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.library.clustering.directed;
+
+import org.apache.flink.api.common.functions.GroupReduceFunction;
+import org.apache.flink.api.common.functions.JoinFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.operators.Order;
+import org.apache.flink.api.common.operators.base.JoinOperatorBase;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields;
+import 
org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsFirst;
+import 
org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsSecond;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.EdgeOrder;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.GraphAlgorithm;
+import org.apache.flink.graph.asm.degree.annotate.directed.EdgeDegreesPair;
+import 
org.apache.flink.graph.asm.degree.annotate.directed.VertexDegrees.Degrees;
+import org.apache.flink.types.ByteValue;
+import org.apache.flink.types.CopyableValue;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.Preconditions;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
+
+/**
+ * Generates a listing of distinct triangles from the input graph.
+ * <br/>
+ * A triangle is a 3-clique with vertices A, B, and C connected by edges
+ * (A, B), (A, C), and (B, C).
+ * <br/>
+ * The input graph must not contain duplicate edges or self-loops.
+ *
+ * @param <K> graph ID type
+ * @param <VV> vertex value type
+ * @param <EV> edge value type
+ */
+public class TriangleListing<K extends Comparable<K> & CopyableValue<K>, VV, 
EV>
+implements GraphAlgorithm<K, VV, EV, DataSet<TriangleListing.Result<K>>> {
+
+       // Optional configuration
+       private boolean sortTriangleVertices = false;
+
+       private int littleParallelism = PARALLELISM_DEFAULT;
+
+       /**
+        * Normalize the triangle listing such that for each result (K0, K1, K2)
+        * the vertex IDs are sorted K0 < K1 < K2.
+        *
+        * @param sortTriangleVertices whether to output each triangle's 
vertices in sorted order
+        * @return this
+        */
+       public TriangleListing<K, VV, EV> setSortTriangleVertices(boolean 
sortTriangleVertices) {
+               this.sortTriangleVertices = sortTriangleVertices;
+
+               return this;
+       }
+
+       /**
+        * Override the parallelism of operators processing small amounts of 
data.
+        *
+        * @param littleParallelism operator parallelism
+        * @return this
+        */
+       public TriangleListing<K, VV, EV> setLittleParallelism(int 
littleParallelism) {
+               Preconditions.checkArgument(littleParallelism > 0 || 
littleParallelism == PARALLELISM_DEFAULT,
+                       "The parallelism must be greater than zero.");
+
+               this.littleParallelism = littleParallelism;
+
+               return this;
+       }
+
+       /*
+        * Implementation notes:
+        *
+        * The requirement that "K extends CopyableValue<K>" can be removed when
+        *   Flink has a self-join and GenerateTriplets is implemented as such.
+        *
+        * ProjectTriangles should eventually be replaced by 
".projectFirst("*")"
+        *   when projections use code generation.
+        */
+
+       @Override
+       public DataSet<Result<K>> run(Graph<K, VV, EV> input)
+                       throws Exception {
+               // u, v, bitmask where u < v
+               DataSet<Tuple3<K, K, ByteValue>> filteredByID = input
+                       .getEdges()
+                       .map(new OrderByID<K, EV>())
+                               .setParallelism(littleParallelism)
+                               .name("Order by ID")
+                       .groupBy(0, 1)
+                       .reduceGroup(new ReduceBitmask<K>())
+                               .setParallelism(littleParallelism)
+                               .name("Flatten by ID");
+
+               // u, v, (deg(u), deg(v))
+               DataSet<Edge<K, Tuple3<EV, Degrees, Degrees>>> pairDegrees = 
input
+                       .run(new EdgeDegreesPair<K, VV, EV>()
+                               .setParallelism(littleParallelism));
+
+               // u, v, bitmask where deg(u) < deg(v) or (deg(u) == deg(v) and 
u < v)
+               DataSet<Tuple3<K, K, ByteValue>> filteredByDegree = pairDegrees
+                       .map(new OrderByDegree<K, EV>())
+                               .setParallelism(littleParallelism)
+                               .name("Order by degree")
+                       .groupBy(0, 1)
+                       .reduceGroup(new ReduceBitmask<K>())
+                               .setParallelism(littleParallelism)
+                               .name("Flatten by degree");
+
+               // u, v, w, bitmask where (u, v) and (u, w) are edges in graph
+               DataSet<Tuple4<K, K, K, ByteValue>> triplets = filteredByDegree
+                       .groupBy(0)
+                       .sortGroup(1, Order.ASCENDING)
+                       .reduceGroup(new GenerateTriplets<K>())
+                               .setParallelism(littleParallelism)
+                               .name("Generate triplets");
+
+               // u, v, w, bitmask where (u, v), (u, w), and (v, w) are edges 
in graph
+               DataSet<Result<K>> triangles = triplets
+                       .join(filteredByID, 
JoinOperatorBase.JoinHint.REPARTITION_HASH_SECOND)
+                       .where(1, 2)
+                       .equalTo(0, 1)
+                       .with(new ProjectTriangles<K>())
+                               .setParallelism(littleParallelism)
+                               .name("Triangle listing");
+
+               if (sortTriangleVertices) {
+                       triangles = triangles
+                               .map(new SortTriangleVertices<K>())
+                                       .name("Sort triangle vertices");
+               }
+
+               return triangles;
+       }
+
+       /**
+        * Removes edge values while emitting a Tuple3 where f0 and f1 are,
+        * respectively, the lesser and greater of the source and target IDs.
+        * The third field is a bitmask representing the vertex order.
+        *
+        * @param <T> ID type
+        * @param <ET> edge value type
+        */
+       private static final class OrderByID<T extends Comparable<T>, ET>
+       implements MapFunction<Edge<T, ET>, Tuple3<T, T, ByteValue>> {
+               private ByteValue forward = new 
ByteValue(EdgeOrder.FORWARD.getBitmask());
+
+               private ByteValue reverse = new 
ByteValue(EdgeOrder.REVERSE.getBitmask());
+
+               private Tuple3<T, T, ByteValue> output = new Tuple3<>();
+
+               @Override
+               public Tuple3<T, T, ByteValue> map(Edge<T, ET> value)
+                               throws Exception {
+                       if (value.f0.compareTo(value.f1) < 0) {
+                               output.f0 = value.f0;
+                               output.f1 = value.f1;
+                               output.f2 = forward;
+                       } else {
+                               output.f0 = value.f1;
+                               output.f1 = value.f0;
+                               output.f2 = reverse;
+                       }
+
+                       return output;
+               }
+       }
+
+       /**
+        * Reduce bitmasks to a single value using bitwise-or.
+        *
+        * @param <T> ID type
+        */
+       @ForwardedFields("0; 1")
+       private static final class ReduceBitmask<T>
+       implements GroupReduceFunction<Tuple3<T, T, ByteValue>, Tuple3<T, T, 
ByteValue>> {
+               @Override
+               public void reduce(Iterable<Tuple3<T, T, ByteValue>> values, 
Collector<Tuple3<T, T, ByteValue>> out)
+                               throws Exception {
+                       Tuple3<T, T, ByteValue> output = null;
+
+                       byte bitmask = 0;
+
+                       for (Tuple3<T, T, ByteValue> value: values) {
+                               output = value;
+                               bitmask |= value.f2.getValue();
+                       }
+
+                       output.f2.setValue(bitmask);
+                       out.collect(output);
+               }
+       }
+
+       /**
+        * Removes edge values while emitting a Tuple3 where f0 and f1 are,
+        * respectively, the lesser and greater of the source and target IDs
+        * by degree count. If the source and target vertex degrees are equal
+        * then the IDs are compared and emitted in order. The third field is
+        * a bitmask representing the vertex order.
+        *
+        * @param <T> ID type
+        * @param <ET> edge value type
+        */
+       private static final class OrderByDegree<T extends Comparable<T>, ET>
+       implements MapFunction<Edge<T, Tuple3<ET, Degrees, Degrees>>, Tuple3<T, 
T, ByteValue>> {
+               private ByteValue forward = new 
ByteValue((byte)(EdgeOrder.FORWARD.getBitmask() << 2));
+
+               private ByteValue reverse = new 
ByteValue((byte)(EdgeOrder.REVERSE.getBitmask() << 2));
+
+               private Tuple3<T, T, ByteValue> output = new Tuple3<>();
+
+               @Override
+               public Tuple3<T, T, ByteValue> map(Edge<T, Tuple3<ET, Degrees, 
Degrees>> value)
+                               throws Exception {
+                       Tuple3<ET, Degrees, Degrees> degrees = value.f2;
+                       long sourceDegree = degrees.f1.getDegree().getValue();
+                       long targetDegree = degrees.f2.getDegree().getValue();
+
+                       if (sourceDegree < targetDegree ||
+                                       (sourceDegree == targetDegree && 
value.f0.compareTo(value.f1) < 0)) {
+                               output.f0 = value.f0;
+                               output.f1 = value.f1;
+                               output.f2 = forward;
+                       } else {
+                               output.f0 = value.f1;
+                               output.f1 = value.f0;
+                               output.f2 = reverse;
+                       }
+
+                       return output;
+               }
+       }
+
+       /**
+        * Generates the set of triplets by the pairwise enumeration of the open
+        * neighborhood for each vertex. The number of triplets is quadratic in
+        * the vertex degree; however, data skew is minimized by only generating
+        * triplets from the vertex with least degree.
+        *
+        * @param <T> ID type
+        */
+       @ForwardedFields("0")
+       private static final class GenerateTriplets<T extends CopyableValue<T>>
+       implements GroupReduceFunction<Tuple3<T, T, ByteValue>, Tuple4<T, T, T, 
ByteValue>> {
+               private Tuple4<T, T, T, ByteValue> output = new Tuple4<>(null, 
null, null, new ByteValue());
+
+               private List<Tuple2<T, ByteValue>> visited = new ArrayList<>();
+
+               @Override
+               public void reduce(Iterable<Tuple3<T, T, ByteValue>> values, 
Collector<Tuple4<T, T, T, ByteValue>> out)
+                               throws Exception {
+                       int visitedCount = 0;
+
+                       Iterator<Tuple3<T, T, ByteValue>> iter = 
values.iterator();
+
+                       while (true) {
+                               Tuple3<T, T, ByteValue> edge = iter.next();
+                               byte bitmask = edge.f2.getValue();
+
+                               output.f0 = edge.f0;
+                               output.f2 = edge.f1;
+
+                               for (int i = 0; i < visitedCount; i++) {
+                                       Tuple2<T, ByteValue> previous = 
visited.get(i);
+
+                                       output.f1 = previous.f0;
+                                       
output.f3.setValue((byte)(previous.f1.getValue() | bitmask));
+
+                                       // u, v, w, bitmask
+                                       out.collect(output);
+                               }
+
+                               if (! iter.hasNext()) {
+                                       break;
+                               }
+
+                               byte shiftedBitmask = (byte)(bitmask << 2);
+
+                               if (visitedCount == visited.size()) {
+                                       visited.add(new 
Tuple2<>(edge.f1.copy(), new ByteValue(shiftedBitmask)));
+                               } else {
+                                       Tuple2<T, ByteValue> update = 
visited.get(visitedCount);
+                                       edge.f1.copyTo(update.f0);
+                                       update.f1.setValue(shiftedBitmask);
+                               }
+
+                               visitedCount += 1;
+                       }
+               }
+       }
+
+       /**
+        * Simply project the triplet as a triangle while collapsing triplet 
and edge bitmasks.
+        *
+        * @param <T> ID type
+        */
+       @ForwardedFieldsFirst("0; 1; 2")
+       @ForwardedFieldsSecond("0; 1")
+       private static final class ProjectTriangles<T>
+       implements JoinFunction<Tuple4<T, T, T, ByteValue>, Tuple3<T, T, 
ByteValue>, Result<T>> {
+               private Result<T> output = new Result<>(null, null, null, new 
ByteValue());
+
+               @Override
+               public Result<T> join(Tuple4<T, T, T, ByteValue> triplet, 
Tuple3<T, T, ByteValue> edge)
+                               throws Exception {
+                       output.f0 = triplet.f0;
+                       output.f1 = triplet.f1;
+                       output.f2 = triplet.f2;
+                       output.f3.setValue((byte)(triplet.f3.getValue() | 
edge.f2.getValue()));
+                       return output;
+               }
+       }
+
+       /**
+        * Reorders the vertices of each emitted triangle (K0, K1, K2, bitmask)
+        * into sorted order such that K0 < K1 < K2.
+        *
+        * @param <T> ID type
+        */
+       private static final class SortTriangleVertices<T extends Comparable<T>>
+       implements MapFunction<Result<T>, Result<T>> {
+               @Override
+               public Result<T> map(Result<T> value)
+                               throws Exception {
+                       // by the triangle listing algorithm we know f1 < f2
+                       if (value.f0.compareTo(value.f1) > 0) {
+                               byte bitmask = value.f3.getValue();
+
+                               T temp_val = value.f0;
+                               value.f0 = value.f1;
+
+                               if (temp_val.compareTo(value.f2) < 0) {
+                                       value.f1 = temp_val;
+
+                                       int f0f1 = ((bitmask & 0b100000) >>> 1) 
| ((bitmask & 0b010000) << 1);
+                                       int f0f2 = (bitmask & 0b001100) >>> 2;
+                                       int f1f2 = (bitmask & 0b000011) << 2;
+
+                                       value.f3.setValue((byte)(f0f1 | f0f2 | 
f1f2));
+                               } else {
+                                       value.f1 = value.f2;
+                                       value.f2 = temp_val;
+
+                                       int f0f1 = (bitmask & 0b000011) << 4;
+                                       int f0f2 = ((bitmask & 0b100000) >>> 3) 
| ((bitmask & 0b010000) >>> 1);
+                                       int f1f2 = ((bitmask & 0b001000) >>> 3) 
| ((bitmask & 0b000100) >>> 1);
+
+                                       value.f3.setValue((byte)(f0f1 | f0f2 | 
f1f2));
+                               }
+                       }
+
+                       return value;
+               }
+       }
+
+       /**
+        * Wraps the vertex type to encapsulate results from the Triangle 
Listing algorithm.
+        *
+        * @param <T> ID type
+        */
+       public static class Result<T>
+       extends Tuple4<T, T, T, ByteValue> {
+               /**
+                * No-args constructor.
+                */
+               public Result() {}
+
+               /**
+                * Populates parent tuple with constructor parameters.
+                *
+                * @param value0 1st triangle vertex ID
+                * @param value1 2nd triangle vertex ID
+                * @param value2 3rd triangle vertex ID
+                * @param value3 bitmask indicating presence of six possible 
edges between triangle vertices
+                */
+               public Result(T value0, T value1, T value2, ByteValue value3) {
+                       super(value0, value1, value2, value3);
+               }
+
+               /**
+                * Format values into a human-readable string.
+                *
+                * @return verbose string
+                */
+               public String toVerboseString() {
+                       byte bitmask = f3.getValue();
+
+                       return "1st vertex ID: " + f0
+                               + ", 2nd vertex ID: " + f1
+                               + ", 3rd vertex ID: " + f2
+                               + ", edge directions: " + f0 + 
maskToString(bitmask, 4) + f1
+                               + ", " + f0 + maskToString(bitmask, 2) + f2
+                               + ", " + f1 + maskToString(bitmask, 0) + f2;
+               }
+
+               private String maskToString(byte mask, int shift) {
+                       switch((mask >>> shift) & 0b000011) {
+                               case 0b01:
+                                       // EdgeOrder.FORWARD
+                                       return "->";
+                               case 0b10:
+                                       // EdgeOrder.REVERSE
+                                       return "<-";
+                               case 0b11:
+                                       // EdgeOrder.MUTUAL
+                                       return "<->";
+                               default:
+                                       throw new 
IllegalArgumentException("Bitmask is missing an edge (mask = "
+                                               + mask + ", shift = " + shift);
+                       }
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d34bdaf7/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/GlobalClusteringCoefficient.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/GlobalClusteringCoefficient.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/GlobalClusteringCoefficient.java
index fc89e43..c11cb3c 100644
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/GlobalClusteringCoefficient.java
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/GlobalClusteringCoefficient.java
@@ -58,6 +58,13 @@ extends AbstractGraphAnalytic<K, VV, EV, Result> {
                return this;
        }
 
+       /*
+        * Implementation notes:
+        *
+        * The requirement that "K extends CopyableValue<K>" can be removed when
+        *   removed from TriangleListing.
+        */
+
        @Override
        public GlobalClusteringCoefficient<K, VV, EV> run(Graph<K, VV, EV> 
input)
                        throws Exception {
@@ -78,7 +85,10 @@ extends AbstractGraphAnalytic<K, VV, EV, Result> {
 
        @Override
        public Result getResult() {
-               return new 
Result(vertexMetrics.getResult().getNumberOfTriplets(), 3 * 
triangleCount.getResult());
+               // each triangle is counted from each of the three vertices
+               long numberOfTriangles = 3 * triangleCount.getResult();
+
+               return new 
Result(vertexMetrics.getResult().getNumberOfTriplets(), numberOfTriangles);
        }
 
        /**
@@ -86,8 +96,15 @@ extends AbstractGraphAnalytic<K, VV, EV, Result> {
         */
        public static class Result {
                private long tripletCount;
+
                private long triangleCount;
 
+               /**
+                * Instantiate an immutable result.
+                *
+                * @param tripletCount triplet count
+                * @param triangleCount triangle count
+                */
                public Result(long tripletCount, long triangleCount) {
                        this.tripletCount = tripletCount;
                        this.triangleCount = triangleCount;
@@ -121,13 +138,15 @@ extends AbstractGraphAnalytic<K, VV, EV, Result> {
                 *
                 * @return global clustering coefficient score
                 */
-               public double getLocalClusteringCoefficientScore() {
+               public double getGlobalClusteringCoefficientScore() {
                        return (tripletCount == 0) ? Double.NaN : triangleCount 
/ (double)tripletCount;
                }
 
                @Override
                public String toString() {
-                       return "triplet count: " + tripletCount + ", triangle 
count:" + triangleCount;
+                       return "triplet count: " + tripletCount
+                               + ", triangle count: " + triangleCount
+                               + ", global clustering coefficient: " + 
getGlobalClusteringCoefficientScore();
                }
 
                @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/d34bdaf7/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficient.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficient.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficient.java
index bc62d36..6858818 100644
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficient.java
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficient.java
@@ -102,7 +102,7 @@ implements GraphAlgorithm<K, VV, EV, DataSet<Result<K>>> {
                // u, triangle count
                DataSet<Tuple2<K, LongValue>> vertexTriangleCount = 
triangleVertices
                        .groupBy(0)
-                       .reduce(new CountVertices<K>())
+                       .reduce(new CountTriangles<K>())
                                .name("Count triangles");
 
                // u, deg(u)
@@ -145,12 +145,12 @@ implements GraphAlgorithm<K, VV, EV, DataSet<Result<K>>> {
        }
 
        /**
-        * Combines the count of each vertex ID.
+        * Sums the triangle count for each vertex ID.
         *
         * @param <T> ID type
         */
        @FunctionAnnotation.ForwardedFields("0")
-       private static class CountVertices<T>
+       private static class CountTriangles<T>
        implements ReduceFunction<Tuple2<T, LongValue>> {
                @Override
                public Tuple2<T, LongValue> reduce(Tuple2<T, LongValue> left, 
Tuple2<T, LongValue> right)
@@ -185,7 +185,7 @@ implements GraphAlgorithm<K, VV, EV, DataSet<Result<K>>> {
        }
 
        /**
-        * Wraps the vertex type to encapsulate results from the local 
clustering coefficient algorithm.
+        * Wraps the vertex type to encapsulate results from the Local 
Clustering Coefficient algorithm.
         *
         * @param <T> ID type
         */
@@ -195,9 +195,6 @@ implements GraphAlgorithm<K, VV, EV, DataSet<Result<K>>> {
 
                private Murmur3_32 hasher = new Murmur3_32(HASH_SEED);
 
-               /**
-                * The no-arg constructor instantiates contained objects.
-                */
                public Result() {
                        f1 = new Tuple2<>();
                }
@@ -238,6 +235,11 @@ implements GraphAlgorithm<K, VV, EV, DataSet<Result<K>>> {
                        return (neighborPairs == 0) ? Double.NaN : 
getTriangleCount().getValue() / (double)neighborPairs;
                }
 
+               /**
+                * Format values into a human-readable string.
+                *
+                * @return verbose string
+                */
                public String toVerboseString() {
                        return "Vertex ID: " + f0
                                + ", vertex degree: " + getDegree()

http://git-wip-us.apache.org/repos/asf/flink/blob/d34bdaf7/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/TriangleCount.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/TriangleCount.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/TriangleCount.java
index bc43725..46e1875 100644
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/TriangleCount.java
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/TriangleCount.java
@@ -66,7 +66,9 @@ extends AbstractGraphAnalytic<K, VV, EV, Long> {
                                .setSortTriangleVertices(false)
                                .setLittleParallelism(littleParallelism));
 
-               triangles.output(new CountHelper<Tuple3<K, K, 
K>>(id)).name("Count triangles");
+               triangles
+                       .output(new CountHelper<Tuple3<K, K, K>>(id))
+                               .name("Count triangles");
 
                return this;
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/d34bdaf7/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/TriangleListing.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/TriangleListing.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/TriangleListing.java
index 6245433..4f8ce7a 100644
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/TriangleListing.java
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/TriangleListing.java
@@ -220,6 +220,7 @@ implements GraphAlgorithm<K, VV, EV, DataSet<Tuple3<K, K, 
K>>> {
         *
         * @param <T> ID type
         */
+       @ForwardedFields("0")
        private static final class GenerateTriplets<T extends CopyableValue<T>>
        implements GroupReduceFunction<Tuple2<T, T>, Tuple3<T, T, T>> {
                private Tuple3<T, T, T> output = new Tuple3<>();
@@ -269,9 +270,9 @@ implements GraphAlgorithm<K, VV, EV, DataSet<Tuple3<K, K, 
K>>> {
        private static final class ProjectTriangles<T>
        implements JoinFunction<Tuple3<T, T, T>, Tuple2<T, T>, Tuple3<T, T, T>> 
{
                @Override
-               public Tuple3<T, T, T> join(Tuple3<T, T, T> first, Tuple2<T, T> 
second)
+               public Tuple3<T, T, T> join(Tuple3<T, T, T> triplet, Tuple2<T, 
T> edge)
                                throws Exception {
-                       return first;
+                       return triplet;
                }
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/d34bdaf7/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/directed/VertexMetrics.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/directed/VertexMetrics.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/directed/VertexMetrics.java
new file mode 100644
index 0000000..1c7df7e
--- /dev/null
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/directed/VertexMetrics.java
@@ -0,0 +1,238 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one
+ *  * or more contributor license agreements.  See the NOTICE file
+ *  * distributed with this work for additional information
+ *  * regarding copyright ownership.  The ASF licenses this file
+ *  * to you under the Apache License, Version 2.0 (the
+ *  * "License"); you may not use this file except in compliance
+ *  * with the License.  You may obtain a copy of the License at
+ *  *
+ *  *     http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+
+package org.apache.flink.graph.library.metric.directed;
+
+import org.apache.commons.lang3.builder.EqualsBuilder;
+import org.apache.commons.lang3.builder.HashCodeBuilder;
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.accumulators.LongCounter;
+import org.apache.flink.api.common.io.RichOutputFormat;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.graph.AbstractGraphAnalytic;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.asm.degree.annotate.directed.VertexDegrees;
+import 
org.apache.flink.graph.asm.degree.annotate.directed.VertexDegrees.Degrees;
+import org.apache.flink.graph.library.metric.directed.VertexMetrics.Result;
+import org.apache.flink.types.CopyableValue;
+import org.apache.flink.util.AbstractID;
+
+import java.io.IOException;
+
+import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
+
+/**
+ * Compute the number of vertices, number of edges, and number of triplets in
+ * a directed graph.
+ *
+ * @param <K> graph ID type
+ * @param <VV> vertex value type
+ * @param <EV> edge value type
+ */
+public class VertexMetrics<K extends Comparable<K> & CopyableValue<K>, VV, EV>
+extends AbstractGraphAnalytic<K, VV, EV, Result> {
+
+       private String id = new AbstractID().toString();
+
+       // Optional configuration
+       private boolean includeZeroDegreeVertices = false;
+
+       private int parallelism = PARALLELISM_DEFAULT;
+
+       /**
+        * By default only the edge set is processed for the computation of 
degree.
+        * When this flag is set an additional join is performed against the 
vertex
+        * set in order to output vertices with a degree of zero.
+        *
+        * @param includeZeroDegreeVertices whether to output vertices with a
+        *                                  degree of zero
+        * @return this
+        */
+       public VertexMetrics<K, VV, EV> setIncludeZeroDegreeVertices(boolean 
includeZeroDegreeVertices) {
+               this.includeZeroDegreeVertices = includeZeroDegreeVertices;
+
+               return this;
+       }
+
+       /**
+        * Override the operator parallelism.
+        *
+        * @param parallelism operator parallelism
+        * @return this
+        */
+       public VertexMetrics<K, VV, EV> setParallelism(int parallelism) {
+               this.parallelism = parallelism;
+
+               return this;
+       }
+
+       @Override
+       public VertexMetrics<K, VV, EV> run(Graph<K, VV, EV> input)
+                       throws Exception {
+               super.run(input);
+
+               DataSet<Vertex<K, Degrees>> vertexDegree = input
+                       .run(new VertexDegrees<K, VV, EV>()
+                               
.setIncludeZeroDegreeVertices(includeZeroDegreeVertices)
+                               .setParallelism(parallelism));
+
+               vertexDegree
+                       .output(new VertexMetricsHelper<K>(id))
+                               .name("Vertex metrics");
+
+               return this;
+       }
+
+       @Override
+       public Result getResult() {
+               JobExecutionResult res = env.getLastJobExecutionResult();
+
+               long vertexCount = res.getAccumulatorResult(id + "-0");
+               long edgeCount = res.getAccumulatorResult(id + "-1");
+               long tripletCount = res.getAccumulatorResult(id + "-2");
+
+               return new Result(vertexCount, edgeCount / 2, tripletCount);
+       }
+
+       /**
+        * Helper class to collect vertex metrics.
+        *
+        * @param <T> ID type
+        */
+       private static class VertexMetricsHelper<T>
+       extends RichOutputFormat<Vertex<T, Degrees>> {
+               private final String id;
+
+               private long vertexCount;
+               private long edgeCount;
+               private long tripletCount;
+
+               /**
+                * This helper class collects vertex metrics by scanning over 
and
+                * discarding elements from the given DataSet.
+                *
+                * The unique id is required because Flink's accumulator 
namespace is
+                * among all operators.
+                *
+                * @param id unique string used for accumulator names
+                */
+               public VertexMetricsHelper(String id) {
+                       this.id = id;
+               }
+
+               @Override
+               public void configure(Configuration parameters) {}
+
+               @Override
+               public void open(int taskNumber, int numTasks) throws 
IOException {}
+
+               @Override
+               public void writeRecord(Vertex<T, Degrees> record) throws 
IOException {
+                       long degree = record.f1.getDegree().getValue();
+                       long outDegree = record.f1.getOutDegree().getValue();
+
+                       vertexCount++;
+                       edgeCount += outDegree;
+                       tripletCount += degree * (degree - 1) / 2;
+               }
+
+               @Override
+               public void close() throws IOException {
+                       getRuntimeContext().addAccumulator(id + "-0", new 
LongCounter(vertexCount));
+                       getRuntimeContext().addAccumulator(id + "-1", new 
LongCounter(edgeCount));
+                       getRuntimeContext().addAccumulator(id + "-2", new 
LongCounter(tripletCount));
+               }
+       }
+
+       /**
+        * Wraps vertex metrics.
+        */
+       public static class Result {
+               private long vertexCount;
+               private long edgeCount;
+               private long tripletCount;
+
+               public Result(long vertexCount, long edgeCount, long 
tripletCount) {
+                       this.vertexCount = vertexCount;
+                       this.edgeCount = edgeCount;
+                       this.tripletCount = tripletCount;
+               }
+
+               /**
+                * Get the number of vertices.
+                *
+                * @return number of vertices
+                */
+               public long getNumberOfVertices() {
+                       return vertexCount;
+               }
+
+               /**
+                * Get the number of edges.
+                *
+                * @return number of edges
+                */
+               public long getNumberOfEdges() {
+                       return edgeCount;
+               }
+
+               /**
+                * Get the number of triplets.
+                *
+                * @return number of triplets
+                */
+               public long getNumberOfTriplets() {
+                       return tripletCount;
+               }
+
+               @Override
+               public String toString() {
+                       return "vertex count: " + vertexCount
+                               + ", edge count:" + edgeCount
+                               + ", triplet count: " + tripletCount;
+               }
+
+               @Override
+               public int hashCode() {
+                       return new HashCodeBuilder()
+                               .append(vertexCount)
+                               .append(edgeCount)
+                               .append(tripletCount)
+                               .hashCode();
+               }
+
+               @Override
+               public boolean equals(Object obj) {
+                       if (obj == null) { return false; }
+                       if (obj == this) { return true; }
+                       if (obj.getClass() != getClass()) { return false; }
+
+                       Result rhs = (Result)obj;
+
+                       return new EqualsBuilder()
+                               .append(vertexCount, rhs.vertexCount)
+                               .append(edgeCount, rhs.edgeCount)
+                               .append(tripletCount, rhs.tripletCount)
+                               .isEquals();
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d34bdaf7/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/undirected/VertexMetrics.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/undirected/VertexMetrics.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/undirected/VertexMetrics.java
index 41ae27a..3c26e43 100644
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/undirected/VertexMetrics.java
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/undirected/VertexMetrics.java
@@ -111,7 +111,9 @@ extends AbstractGraphAnalytic<K, VV, EV, Result> {
                                .setReduceOnTargetId(reduceOnTargetId)
                                .setParallelism(parallelism));
 
-               vertexDegree.output(new 
VertexMetricsHelper<K>(id)).name("Vertex metrics");
+               vertexDegree
+                       .output(new VertexMetricsHelper<K>(id))
+                               .name("Vertex metrics");
 
                return this;
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/d34bdaf7/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/Murmur3_32.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/Murmur3_32.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/Murmur3_32.java
index 98f2b97..6c3b5ab 100644
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/Murmur3_32.java
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/Murmur3_32.java
@@ -67,11 +67,11 @@ public class Murmur3_32 implements Serializable {
                count++;
 
                input *= 0xcc9e2d51;
-               input = input << 15;
+               input = Integer.rotateLeft(input, 15);
                input *= 0x1b873593;
 
                hash ^= input;
-               hash = hash << 13;
+               hash = Integer.rotateLeft(hash, 13);
                hash = hash * 5 + 0xe6546b64;
 
                return this;

Reply via email to