Repository: flink
Updated Branches:
  refs/heads/master 7160a6812 -> 8da1a75ce


[FLINK-3906] [gelly] Global Clustering Coefficient

The global clustering coefficient measures the connectedness of a graph.
Scores range from 0.0 (no triangles) to 1.0 (complete graph).

This closes #1997


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8da1a75c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8da1a75c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8da1a75c

Branch: refs/heads/master
Commit: 8da1a75ceb30ef1bce27a2426dab3a0f66b94b64
Parents: 7160a68
Author: Greg Hogan <c...@greghogan.com>
Authored: Tue May 17 10:02:47 2016 -0400
Committer: Greg Hogan <c...@greghogan.com>
Committed: Tue Jun 7 12:26:15 2016 -0400

----------------------------------------------------------------------
 docs/apis/batch/libs/gelly.md                   |  38 ++-
 .../graph/library/TriangleCountITCase.java      |  53 ----
 .../org/apache/flink/graph/scala/Graph.scala    |  18 +-
 .../flink/graph/AbstractGraphAnalytic.java      |  62 +++++
 .../main/java/org/apache/flink/graph/Graph.java |  32 ++-
 .../org/apache/flink/graph/GraphAnalytic.java   |  74 ++++++
 .../flink/graph/library/GSATriangleCount.java   | 192 --------------
 .../undirected/GlobalClusteringCoefficient.java | 155 ++++++++++++
 .../undirected/LocalClusteringCoefficient.java  |   2 +-
 .../clustering/undirected/TriangleCount.java    |  78 ++++++
 .../clustering/undirected/TriangleListing.java  |   4 +-
 .../metric/undirected/VertexMetrics.java        | 251 +++++++++++++++++++
 .../GlobalClusteringCoefficientTest.java        |  84 +++++++
 .../LocalClusteringCoefficientTest.java         |   8 +-
 .../undirected/TriangleCountTest.java           |  75 ++++++
 .../metric/undirected/VertexMetricsTest.java    |  97 +++++++
 16 files changed, 954 insertions(+), 269 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/8da1a75c/docs/apis/batch/libs/gelly.md
----------------------------------------------------------------------
diff --git a/docs/apis/batch/libs/gelly.md b/docs/apis/batch/libs/gelly.md
index 7adff04..45fdbe5 100644
--- a/docs/apis/batch/libs/gelly.md
+++ b/docs/apis/batch/libs/gelly.md
@@ -1828,12 +1828,13 @@ Gelly has a growing collection of graph algorithms for 
easily analyzing large-sc
 * [GSA PageRank](#gsa-pagerank)
 * [Single Source Shortest Paths](#single-source-shortest-paths)
 * [GSA Single Source Shortest Paths](#gsa-single-source-shortest-paths)
-* [GSA Triangle Count](#gsa-triangle-count)
+* [Triangle Count](#triangle-count)
 * [Triangle Enumerator](#triangle-enumerator)
 * [Hyperlink-Induced Topic Search](#hyperlink-induced-topic-search)
 * [Summarization](#summarization)
 * [Jaccard Index](#jaccard-index)
 * [Local Clustering Coefficient](#local-clustering-coefficient)
+* [Global Clustering Coefficient](#global-clustering-coefficient)
 
 Gelly's library methods can be used by simply calling the `run()` method on 
the input graph:
 
@@ -1997,19 +1998,23 @@ The algorithm is implemented using [gather-sum-apply 
iterations](#gather-sum-app
 
 See the [Single Source Shortest Paths](#single-source-shortest-paths) library 
method for implementation details and usage information.
 
-### GSA Triangle Count
+### Triangle Count
 
 #### Overview
-An implementation of the Triangle Count algorithm. Given an input graph, it 
returns the number of unique triangles in it.
+An analytic for counting the number of unique triangles in a graph.
 
 #### Details
-This algorithm operates in three phases. First, vertices select neighbors with 
IDs greater than theirs
-and send messages to them. Each received message is then propagated to 
neighbors with higher IDs.
-Finally, if a node encounters the target ID in the list of received messages, 
it increments the number of discovered triangles.
+Counts the triangles generated by [Triangle Listing](#triangle-listing).
 
 #### Usage
-The algorithm takes an undirected, unweighted graph as input and outputs a 
`DataSet` which contains a single integer corresponding to the number of 
triangles
-in the graph. The algorithm constructor takes no arguments.
+The analytic takes an undirected graph as input and returns as a result a 
`Long` corresponding to the number of triangles
+in the graph. The graph ID type must be `Comparable` and `Copyable`.
+
+### Triangle Listing
+
+This algorithm supports object reuse. The graph ID type must be `Comparable` 
and `Copyable`.
+
+See the [Triangle Enumerator](#triangle-enumerator) library method for 
implementation details.
 
 ### Triangle Enumerator
 
@@ -2108,7 +2113,22 @@ See the [Triangle Enumeration](#triangle-enumeration) 
library method for a detai
 
 #### Usage
 The algorithm takes a simple, undirected graph as input and outputs a 
`DataSet` of tuples containing the vertex ID,
-vertex degree, and number of triangles containing the vertex. The vertex ID 
must be `Comparable` and `Copyable`.
+vertex degree, and number of triangles containing the vertex. The graph ID 
type must be `Comparable` and `Copyable`.
+
+### Global Clustering Coefficient
+
+#### Overview
+The global clustering coefficient measures the connectedness of a graph. 
Scores range from 0.0 (no edges between
+neighbors) to 1.0 (complete graph).
+
+#### Details
+See the [Local Clustering Coefficient](#local-clustering-coefficient) library 
method for a detailed explanation of
+clustering coefficient.
+
+#### Usage
+The algorithm takes a simple, undirected graph as input and outputs a result 
containing the total number of triplets and
+triangles in the graph. The graph ID type must be `Comparable` and `Copyable`.
+
 
 {% top %}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/8da1a75c/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/library/TriangleCountITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/library/TriangleCountITCase.java
 
b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/library/TriangleCountITCase.java
deleted file mode 100644
index aaada8f..0000000
--- 
a/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/library/TriangleCountITCase.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.library;
-
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.examples.data.TriangleCountData;
-import org.apache.flink.test.util.MultipleProgramsTestBase;
-import org.apache.flink.types.NullValue;
-import org.junit.Assert;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-import java.util.List;
-
-@RunWith(Parameterized.class)
-public class TriangleCountITCase extends MultipleProgramsTestBase {
-
-       public TriangleCountITCase(TestExecutionMode mode) {
-               super(mode);
-       }
-
-       @Test
-       public void testGSATriangleCount() throws Exception {
-
-               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-
-               Graph<Long, NullValue, NullValue> graph = 
Graph.fromDataSet(TriangleCountData.getDefaultEdgeDataSet(env),
-                               env).getUndirected();
-
-               List<Integer> numberOfTriangles = graph.run(new 
GSATriangleCount<Long, NullValue, NullValue>()).collect();
-               String expectedResult = 
TriangleCountData.RESULTED_NUMBER_OF_TRIANGLES;
-
-               Assert.assertEquals(numberOfTriangles.get(0).intValue(), 
Integer.parseInt(expectedResult));
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/8da1a75c/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/Graph.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/Graph.scala
 
b/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/Graph.scala
index f31619d..3881aae 100644
--- 
a/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/Graph.scala
+++ 
b/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/Graph.scala
@@ -1103,19 +1103,35 @@ TypeInformation : ClassTag](jgraph: jg.Graph[K, VV, 
EV]) {
    * @return a Dataset of Tuple2, with one tuple per vertex.
    * The first field of the Tuple2 is the vertex ID and the second field
    * is the aggregate value computed by the provided 
[[ReduceNeighborsFunction]].
-  */
+   */
   def reduceOnEdges(reduceEdgesFunction: ReduceEdgesFunction[EV], direction: 
EdgeDirection):
   DataSet[(K, EV)] = {
     wrap(jgraph.reduceOnEdges(reduceEdgesFunction, direction)).map(jtuple => 
(jtuple.f0,
       jtuple.f1))
   }
 
+  /**
+   * @param algorithm the algorithm to run on the Graph
+   * @return the result of the graph algorithm
+   */
   def run[T: TypeInformation : ClassTag](algorithm: GraphAlgorithm[K, VV, EV, 
T]):
   T = {
     jgraph.run(algorithm)
   }
 
   /**
+   * A GraphAnalytic is similar to a GraphAlgorithm but is terminal and results
+   * are retrieved via accumulators.  A Flink program has a single point of
+   * execution. A GraphAnalytic defers execution to the user to allow composing
+   * multiple analytics and algorithms into a single program.
+   *
+   * @param analytic the analytic to run on the Graph
+   */
+  def run[T: TypeInformation : ClassTag](analytic: GraphAnalytic[K, VV, EV, 
T])= {
+    jgraph.run(analytic)
+  }
+
+  /**
    * Runs a scatter-gather iteration on the graph.
    * No configuration options are provided.
    *

http://git-wip-us.apache.org/repos/asf/flink/blob/8da1a75c/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/AbstractGraphAnalytic.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/AbstractGraphAnalytic.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/AbstractGraphAnalytic.java
new file mode 100644
index 0000000..b13e82e
--- /dev/null
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/AbstractGraphAnalytic.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph;
+
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * Base class for {@link GraphAnalytic}.
+ *
+ * @param <K> key type
+ * @param <VV> vertex value type
+ * @param <EV> edge value type
+ * @param <T> the return type
+ */
+public abstract class AbstractGraphAnalytic<K, VV, EV, T>
+implements GraphAnalytic<K, VV, EV, T> {
+
+       protected ExecutionEnvironment env;
+
+       @Override
+       public GraphAnalytic<K, VV, EV, T> run(Graph<K, VV, EV> input)
+                       throws Exception {
+               env = input.getContext();
+               return null;
+       }
+
+       @Override
+       public T execute()
+                       throws Exception {
+               Preconditions.checkNotNull(env);
+
+               env.execute();
+               return getResult();
+       }
+
+       @Override
+       public T execute(String jobName)
+                       throws Exception {
+               Preconditions.checkNotNull(jobName);
+               Preconditions.checkNotNull(env);
+
+               env.execute(jobName);
+               return getResult();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8da1a75c/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
index b17f7a5..dd25cfd 100755
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
@@ -18,20 +18,13 @@
 
 package org.apache.flink.graph;
 
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.NoSuchElementException;
-import java.util.List;
-import java.util.Arrays;
-
 import org.apache.flink.api.common.functions.CoGroupFunction;
 import org.apache.flink.api.common.functions.FilterFunction;
 import org.apache.flink.api.common.functions.FlatJoinFunction;
-import org.apache.flink.api.common.functions.JoinFunction;
-import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.functions.FlatMapFunction;
 import org.apache.flink.api.common.functions.GroupReduceFunction;
+import org.apache.flink.api.common.functions.JoinFunction;
+import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.DataSet;
@@ -71,6 +64,13 @@ import org.apache.flink.graph.validation.GraphValidator;
 import org.apache.flink.types.NullValue;
 import org.apache.flink.util.Collector;
 
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+
 /**
  * Represents a Graph consisting of {@link Edge edges} and {@link Vertex
  * vertices}.
@@ -1788,6 +1788,20 @@ public class Graph<K, VV, EV> {
        }
 
        /**
+        * A {@code GraphAnalytic} is similar to a {@link GraphAlgorithm} but 
is terminal
+        * and results are retrieved via accumulators.  A Flink program has a 
single
+        * point of execution. A {@code GraphAnalytic} defers execution to the 
user to
+        * allow composing multiple analytics and algorithms into a single 
program.
+        *
+        * @param analytic the analytic to run on the Graph
+        * @param <T> the result type
+        * @throws Exception
+        */
+       public <T> void run(GraphAnalytic<K, VV, EV, T> analytic) throws 
Exception {
+               analytic.run(this);
+       }
+
+       /**
         * Groups by vertex and computes a GroupReduce transformation over the 
neighbors (both edges and vertices)
         * of each vertex. The neighborsFunction applied on the neighbors only 
has access to both the vertex id
         * and the vertex value of the grouping vertex.

http://git-wip-us.apache.org/repos/asf/flink/blob/8da1a75c/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/GraphAnalytic.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/GraphAnalytic.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/GraphAnalytic.java
new file mode 100644
index 0000000..dd221dc
--- /dev/null
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/GraphAnalytic.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph;
+
+import org.apache.flink.api.common.io.OutputFormat;
+import org.apache.flink.api.java.DataSet;
+
+/**
+ * A {@code GraphAnalytic} is similar to a {@link GraphAlgorithm} but is 
terminal
+ * and results are retrieved via accumulators.  A Flink program has a single
+ * point of execution. A {@code GraphAnalytic} defers execution to the user to
+ * allow composing multiple analytics and algorithms into a single program.
+ *
+ * @param <K> key type
+ * @param <VV> vertex value type
+ * @param <EV> edge value type
+ * @param <T> the return type
+ */
+public interface GraphAnalytic<K, VV, EV, T> {
+
+       /**
+        * This method must be called after the program has executed:
+        *  1) "run" analytics and algorithms
+        *  2) call ExecutionEnvironment.execute()
+        *  3) get analytics results
+        *
+        * @return the result
+        */
+       T getResult();
+
+       /**
+        * Execute the program and return the result.
+        *
+        * @return the result
+        * @throws Exception
+        */
+       T execute() throws Exception;
+
+       /**
+        * Execute the program and return the result.
+        *
+        * @param jobName the name to assign to the job
+        * @return the result
+        * @throws Exception
+        */
+       T execute(String jobName) throws Exception;
+
+       /**
+        * All {@code GraphAnalytic} processing must be terminated by an
+        * {@link OutputFormat}. Rather than obtained via accumulators rather 
than
+        * returned by a {@link DataSet}.
+        *
+        * @param input input graph
+        * @return this
+        * @throws Exception
+        */
+       GraphAnalytic<K, VV, EV, T> run(Graph<K, VV, EV> input) throws 
Exception;
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8da1a75c/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/GSATriangleCount.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/GSATriangleCount.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/GSATriangleCount.java
deleted file mode 100644
index 1eafce2..0000000
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/GSATriangleCount.java
+++ /dev/null
@@ -1,192 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.library;
-
-
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.graph.GraphAlgorithm;
-import org.apache.flink.graph.ReduceNeighborsFunction;
-import org.apache.flink.graph.Vertex;
-import org.apache.flink.graph.Edge;
-import org.apache.flink.graph.Triplet;
-import org.apache.flink.graph.EdgeDirection;
-import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.VertexJoinFunction;
-import org.apache.flink.types.NullValue;
-
-import java.util.TreeMap;
-
-/**
- * Triangle Count Algorithm.
- *
- * This algorithm operates in three phases. First, vertices select neighbors 
with id greater than theirs
- * and send messages to them. Each received message is then propagated to 
neighbors with higher id.
- * Finally, if a node encounters the target id in the list of received 
messages, it increments the number
- * of triangles found.
- *
- * This implementation is non - iterative.
- *
- * The algorithm takes an undirected, unweighted graph as input and outputs a 
DataSet
- * which contains a single integer representing the number of triangles.
- */
-public class GSATriangleCount<K extends Comparable<K>, VV, EV> implements
-               GraphAlgorithm<K, VV, EV, DataSet<Integer>> {
-
-       @SuppressWarnings("serial")
-       @Override
-       public DataSet<Integer> run(Graph<K, VV, EV> input) throws Exception {
-
-               ExecutionEnvironment env = input.getContext();
-
-               // order the edges so that src is always higher than trg
-               DataSet<Edge<K, NullValue>> edges = input.getEdges().map(new 
OrderEdges<K, EV>()).distinct();
-
-               Graph<K, TreeMap<K, Integer>, NullValue> graph = 
Graph.fromDataSet(edges,
-                               new VertexInitializer<K>(), env);
-
-               // select neighbors with ids higher than the current vertex id
-               // Gather: a no-op in this case
-               // Sum: create the set of neighbors
-               DataSet<Tuple2<K, TreeMap<K, Integer>>> higherIdNeighbors =
-                               graph.reduceOnNeighbors(new 
GatherHigherIdNeighbors<K>(), EdgeDirection.IN);
-
-               Graph<K, TreeMap<K, Integer>, NullValue> 
graphWithReinitializedVertexValues =
-                               graph.mapVertices(new 
VertexInitializerEmptyTreeMap<K>());
-
-               // Apply: attach the computed values to the vertices
-               // joinWithVertices to update the node values
-               DataSet<Vertex<K, TreeMap<K, Integer>>> 
verticesWithHigherIdNeighbors =
-                               
graphWithReinitializedVertexValues.joinWithVertices(higherIdNeighbors, new 
AttachValues<K>()).getVertices();
-
-               Graph<K, TreeMap<K,Integer>, NullValue> graphWithNeighbors = 
Graph.fromDataSet(verticesWithHigherIdNeighbors,
-                               edges, env);
-
-               // propagate each received value to neighbors with higher id
-               // Gather: a no-op in this case
-               // Sum: propagate values
-               DataSet<Tuple2<K, TreeMap<K, Integer>>> propagatedValues = 
graphWithNeighbors
-                               .reduceOnNeighbors(new 
GatherHigherIdNeighbors<K>(), EdgeDirection.IN);
-
-               // Apply: attach propagated values to vertices
-               DataSet<Vertex<K, TreeMap<K, Integer>>> 
verticesWithPropagatedValues =
-                               
graphWithReinitializedVertexValues.joinWithVertices(propagatedValues, new 
AttachValues<K>()).getVertices();
-
-               Graph<K, TreeMap<K, Integer>, NullValue> 
graphWithPropagatedNeighbors =
-                               Graph.fromDataSet(verticesWithPropagatedValues, 
graphWithNeighbors.getEdges(), env);
-
-               // Scatter: compute the number of triangles
-               DataSet<Integer> numberOfTriangles = 
graphWithPropagatedNeighbors.getTriplets()
-                               .map(new ComputeTriangles<K>()).reduce(new 
ReduceFunction<Integer>() {
-
-                                       @Override
-                                       public Integer reduce(Integer first, 
Integer second) throws Exception {
-                                               return first + second;
-                                       }
-                               });
-
-               return numberOfTriangles;
-       }
-
-       @SuppressWarnings("serial")
-       private static final class OrderEdges<K extends Comparable<K>, EV> 
implements
-               MapFunction<Edge<K, EV>, Edge<K, NullValue>> {
-
-               @Override
-               public Edge<K, NullValue> map(Edge<K, EV> edge) throws 
Exception {
-                       if (edge.getSource().compareTo(edge.getTarget()) < 0) {
-                               return new Edge<K, NullValue>(edge.getTarget(), 
edge.getSource(), NullValue.getInstance());
-                       } else {
-                               return new Edge<K, NullValue>(edge.getSource(), 
edge.getTarget(), NullValue.getInstance());
-                       }
-               }
-       }
-
-       @SuppressWarnings("serial")
-       private static final class VertexInitializer<K> implements 
MapFunction<K, TreeMap<K, Integer>> {
-
-               @Override
-               public TreeMap<K, Integer> map(K value) throws Exception {
-                       TreeMap<K, Integer> neighbors = new TreeMap<K, 
Integer>();
-                       neighbors.put(value, 1);
-
-                       return neighbors;
-               }
-       }
-
-       @SuppressWarnings("serial")
-       private static final class VertexInitializerEmptyTreeMap<K> implements
-                       MapFunction<Vertex<K, TreeMap<K, Integer>>, TreeMap<K, 
Integer>> {
-
-               @Override
-               public TreeMap<K, Integer> map(Vertex<K, TreeMap<K, Integer>> 
vertex) throws Exception {
-                       return new TreeMap<K, Integer>();
-               }
-       }
-
-       @SuppressWarnings("serial")
-       private static final class AttachValues<K> implements 
VertexJoinFunction<TreeMap<K, Integer>,
-                       TreeMap<K, Integer>> {
-
-               @Override
-               public TreeMap<K, Integer> vertexJoin(TreeMap<K, Integer> 
vertexValue,
-                               TreeMap<K, Integer> inputValue) {
-                       return inputValue;
-               }
-       }
-
-       @SuppressWarnings("serial")
-       private static final class GatherHigherIdNeighbors<K> implements
-               ReduceNeighborsFunction<TreeMap<K,Integer>> {
-
-               @Override
-               public TreeMap<K, Integer> reduceNeighbors(TreeMap<K,Integer> 
first, TreeMap<K,Integer> second) {
-                       for (K key : second.keySet()) {
-                               Integer value = first.get(key);
-                               if (value != null) {
-                                       first.put(key, value + second.get(key));
-                               } else {
-                                       first.put(key, second.get(key));
-                               }
-                       }
-                       return first;
-               }
-       }
-
-       @SuppressWarnings("serial")
-       private static final class ComputeTriangles<K> implements 
MapFunction<Triplet<K, TreeMap<K, Integer>, NullValue>,
-                       Integer> {
-
-               @Override
-               public Integer map(Triplet<K, TreeMap<K, Integer>, NullValue> 
triplet) throws Exception {
-
-                       Vertex<K, TreeMap<K, Integer>> srcVertex = 
triplet.getSrcVertex();
-                       Vertex<K, TreeMap<K, Integer>> trgVertex = 
triplet.getTrgVertex();
-                       int triangles = 0;
-
-                       if(trgVertex.getValue().get(srcVertex.getId()) != null) 
{
-                               triangles = 
trgVertex.getValue().get(srcVertex.getId());
-                       }
-                       return triangles;
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/8da1a75c/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/GlobalClusteringCoefficient.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/GlobalClusteringCoefficient.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/GlobalClusteringCoefficient.java
new file mode 100644
index 0000000..fc89e43
--- /dev/null
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/GlobalClusteringCoefficient.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.library.clustering.undirected;
+
+import org.apache.commons.lang3.builder.EqualsBuilder;
+import org.apache.commons.lang3.builder.HashCodeBuilder;
+import org.apache.flink.graph.AbstractGraphAnalytic;
+import org.apache.flink.graph.Graph;
+import 
org.apache.flink.graph.library.clustering.undirected.GlobalClusteringCoefficient.Result;
+import org.apache.flink.graph.library.metric.undirected.VertexMetrics;
+import org.apache.flink.types.CopyableValue;
+
+import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
+
+/**
+ * The global clustering coefficient measures the connectedness of a graph.
+ * Scores range from 0.0 (no triangles) to 1.0 (complete graph).
+ *
+ * @param <K> graph ID type
+ * @param <VV> vertex value type
+ * @param <EV> edge value type
+ */
+public class GlobalClusteringCoefficient<K extends Comparable<K> & 
CopyableValue<K>, VV, EV>
+extends AbstractGraphAnalytic<K, VV, EV, Result> {
+
+       private TriangleCount<K, VV, EV> triangleCount;
+
+       private VertexMetrics<K, VV, EV> vertexMetrics;
+
+       // Optional configuration
+       private int littleParallelism = PARALLELISM_DEFAULT;
+
+       /**
+        * Override the parallelism of operators processing small amounts of 
data.
+        *
+        * @param littleParallelism operator parallelism
+        * @return this
+        */
+       public GlobalClusteringCoefficient<K, VV, EV> setLittleParallelism(int 
littleParallelism) {
+               this.littleParallelism = littleParallelism;
+
+               return this;
+       }
+
+       @Override
+       public GlobalClusteringCoefficient<K, VV, EV> run(Graph<K, VV, EV> 
input)
+                       throws Exception {
+               super.run(input);
+
+               triangleCount = new TriangleCount<K, VV, EV>()
+                       .setLittleParallelism(littleParallelism);
+
+               input.run(triangleCount);
+
+               vertexMetrics = new VertexMetrics<K, VV, EV>()
+                       .setParallelism(littleParallelism);
+
+               input.run(vertexMetrics);
+
+               return this;
+       }
+
+       @Override
+       public Result getResult() {
+               return new 
Result(vertexMetrics.getResult().getNumberOfTriplets(), 3 * 
triangleCount.getResult());
+       }
+
+       /**
+        * Wraps global clustering coefficient metrics.
+        */
+       public static class Result {
+               private long tripletCount;
+               private long triangleCount;
+
+               public Result(long tripletCount, long triangleCount) {
+                       this.tripletCount = tripletCount;
+                       this.triangleCount = triangleCount;
+               }
+
+               /**
+                * Get the number of triplets.
+                *
+                * @return number of triplets
+                */
+               public long getNumberOfTriplets() {
+                       return tripletCount;
+               }
+
+               /**
+                * Get the number of triangles.
+                *
+                * @return number of triangles
+                */
+               public long getNumberOfTriangles() {
+                       return triangleCount;
+               }
+
+               /**
+                * Get the global clustering coefficient score. This is 
computed as the
+                * number of closed triplets (triangles) divided by the total 
number of
+                * triplets.
+                *
+                * A score of {@code Double.NaN} is returned for a graph of 
isolated vertices
+                * for which both the triangle count and number of neighbors 
are zero.
+                *
+                * @return global clustering coefficient score
+                */
+               public double getLocalClusteringCoefficientScore() {
+                       return (tripletCount == 0) ? Double.NaN : triangleCount 
/ (double)tripletCount;
+               }
+
+               @Override
+               public String toString() {
+                       return "triplet count: " + tripletCount + ", triangle 
count:" + triangleCount;
+               }
+
+               @Override
+               public int hashCode() {
+                       return new HashCodeBuilder()
+                               .append(tripletCount)
+                               .append(triangleCount)
+                               .hashCode();
+               }
+
+               @Override
+               public boolean equals(Object obj) {
+                       if (obj == null) { return false; }
+                       if (obj == this) { return true; }
+                       if (obj.getClass() != getClass()) { return false; }
+
+                       Result rhs = (Result)obj;
+
+                       return new EqualsBuilder()
+                               .append(tripletCount, rhs.tripletCount)
+                               .append(triangleCount, rhs.triangleCount)
+                               .isEquals();
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8da1a75c/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficient.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficient.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficient.java
index d1618d1..bc62d36 100644
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficient.java
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficient.java
@@ -185,7 +185,7 @@ implements GraphAlgorithm<K, VV, EV, DataSet<Result<K>>> {
        }
 
        /**
-        * Wraps the vertex type to encapsulate results from the Clustering 
Coefficient algorithm.
+        * Wraps the vertex type to encapsulate results from the local 
clustering coefficient algorithm.
         *
         * @param <T> ID type
         */

http://git-wip-us.apache.org/repos/asf/flink/blob/8da1a75c/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/TriangleCount.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/TriangleCount.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/TriangleCount.java
new file mode 100644
index 0000000..bc43725
--- /dev/null
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/TriangleCount.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.library.clustering.undirected;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.Utils.CountHelper;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.graph.AbstractGraphAnalytic;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.types.CopyableValue;
+import org.apache.flink.util.AbstractID;
+
+import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
+
+/**
+ * Count the number of distinct triangles in an undirected graph.
+ *
+ * @param <K> graph ID type
+ * @param <VV> vertex value type
+ * @param <EV> edge value type
+ * @see TriangleListing
+ */
+public class TriangleCount<K extends Comparable<K> & CopyableValue<K>, VV, EV>
+extends AbstractGraphAnalytic<K, VV, EV, Long> {
+
+       private String id = new AbstractID().toString();
+
+       // Optional configuration
+       private int littleParallelism = PARALLELISM_DEFAULT;
+
+       /**
+        * Override the parallelism of operators processing small amounts of 
data.
+        *
+        * @param littleParallelism operator parallelism
+        * @return this
+        */
+       public TriangleCount<K, VV, EV> setLittleParallelism(int 
littleParallelism) {
+               this.littleParallelism = littleParallelism;
+
+               return this;
+       }
+
+       @Override
+       public TriangleCount<K, VV, EV> run(Graph<K, VV, EV> input)
+                       throws Exception {
+               super.run(input);
+
+               DataSet<Tuple3<K, K, K>> triangles = input
+                       .run(new TriangleListing<K, VV, EV>()
+                               .setSortTriangleVertices(false)
+                               .setLittleParallelism(littleParallelism));
+
+               triangles.output(new CountHelper<Tuple3<K, K, 
K>>(id)).name("Count triangles");
+
+               return this;
+       }
+
+       @Override
+       public Long getResult() {
+               return env.getLastJobExecutionResult().<Long> 
getAccumulatorResult(id);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8da1a75c/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/TriangleListing.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/TriangleListing.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/TriangleListing.java
index 1319d02..6245433 100644
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/TriangleListing.java
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/TriangleListing.java
@@ -286,11 +286,9 @@ implements GraphAlgorithm<K, VV, EV, DataSet<Tuple3<K, K, 
K>>> {
                @Override
                public Tuple3<T, T, T> map(Tuple3<T, T, T> value)
                                throws Exception {
-                       T temp_val;
-
                        // by the triangle listing algorithm we know f1 < f2
                        if (value.f0.compareTo(value.f1) > 0) {
-                               temp_val = value.f0;
+                               T temp_val = value.f0;
                                value.f0 = value.f1;
 
                                if (temp_val.compareTo(value.f2) <= 0) {

http://git-wip-us.apache.org/repos/asf/flink/blob/8da1a75c/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/undirected/VertexMetrics.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/undirected/VertexMetrics.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/undirected/VertexMetrics.java
new file mode 100644
index 0000000..41ae27a
--- /dev/null
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/undirected/VertexMetrics.java
@@ -0,0 +1,251 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.library.metric.undirected;
+
+import org.apache.commons.lang3.builder.EqualsBuilder;
+import org.apache.commons.lang3.builder.HashCodeBuilder;
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.accumulators.LongCounter;
+import org.apache.flink.api.common.io.RichOutputFormat;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.graph.AbstractGraphAnalytic;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.asm.degree.annotate.undirected.VertexDegree;
+import org.apache.flink.graph.library.metric.undirected.VertexMetrics.Result;
+import org.apache.flink.types.CopyableValue;
+import org.apache.flink.types.LongValue;
+import org.apache.flink.util.AbstractID;
+
+import java.io.IOException;
+
+import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
+
+/**
+ * Compute the number of vertices, number of edges, and number of triplets in
+ * an undirected graph.
+ *
+ * @param <K> graph ID type
+ * @param <VV> vertex value type
+ * @param <EV> edge value type
+ */
+public class VertexMetrics<K extends Comparable<K> & CopyableValue<K>, VV, EV>
+extends AbstractGraphAnalytic<K, VV, EV, Result> {
+
+       private String id = new AbstractID().toString();
+
+       // Optional configuration
+       private boolean includeZeroDegreeVertices = false;
+
+       private boolean reduceOnTargetId = false;
+
+       private int parallelism = PARALLELISM_DEFAULT;
+
+       /**
+        * By default only the edge set is processed for the computation of 
degree.
+        * When this flag is set an additional join is performed against the 
vertex
+        * set in order to output vertices with a degree of zero.
+        *
+        * @param includeZeroDegreeVertices whether to output vertices with a
+        *                                  degree of zero
+        * @return this
+        */
+       public VertexMetrics<K, VV, EV> setIncludeZeroDegreeVertices(boolean 
includeZeroDegreeVertices) {
+               this.includeZeroDegreeVertices = includeZeroDegreeVertices;
+
+               return this;
+       }
+
+       /**
+        * The degree can be counted from either the edge source or target IDs.
+        * By default the source IDs are counted. Reducing on target IDs may
+        * optimize the algorithm if the input edge list is sorted by target ID.
+        *
+        * @param reduceOnTargetId set to {@code true} if the input edge list
+        *                         is sorted by target ID
+        * @return this
+        */
+       public VertexMetrics<K, VV, EV> setReduceOnTargetId(boolean 
reduceOnTargetId) {
+               this.reduceOnTargetId = reduceOnTargetId;
+
+               return this;
+       }
+
+       /**
+        * Override the operator parallelism.
+        *
+        * @param parallelism operator parallelism
+        * @return this
+        */
+       public VertexMetrics<K, VV, EV> setParallelism(int parallelism) {
+               this.parallelism = parallelism;
+
+               return this;
+       }
+
+       @Override
+       public VertexMetrics<K, VV, EV> run(Graph<K, VV, EV> input)
+                       throws Exception {
+               super.run(input);
+
+               DataSet<Vertex<K, LongValue>> vertexDegree = input
+                       .run(new VertexDegree<K, VV, EV>()
+                               
.setIncludeZeroDegreeVertices(includeZeroDegreeVertices)
+                               .setReduceOnTargetId(reduceOnTargetId)
+                               .setParallelism(parallelism));
+
+               vertexDegree.output(new 
VertexMetricsHelper<K>(id)).name("Vertex metrics");
+
+               return this;
+       }
+
+       @Override
+       public Result getResult() {
+               JobExecutionResult res = env.getLastJobExecutionResult();
+
+               long vertexCount = res.getAccumulatorResult(id + "-0");
+               long edgeCount = res.getAccumulatorResult(id + "-1");
+               long tripletCount = res.getAccumulatorResult(id + "-2");
+
+               return new Result(vertexCount, edgeCount / 2, tripletCount);
+       }
+
+       /**
+        * Helper class to collect vertex metrics.
+        *
+        * @param <T> ID type
+        */
+       private static class VertexMetricsHelper<T>
+       extends RichOutputFormat<Vertex<T, LongValue>> {
+               private final String id;
+
+               private long vertexCount;
+               private long edgeCount;
+               private long tripletCount;
+
+               /**
+                * This helper class collects vertex metrics by scanning over 
and
+                * discarding elements from the given DataSet.
+                *
+                * The unique id is required because Flink's accumulator 
namespace is
+                * among all operators.
+                *
+                * @param id unique string used for accumulator names
+                */
+               public VertexMetricsHelper(String id) {
+                       this.id = id;
+               }
+
+               @Override
+               public void configure(Configuration parameters) {}
+
+               @Override
+               public void open(int taskNumber, int numTasks) throws 
IOException {}
+
+               @Override
+               public void writeRecord(Vertex<T, LongValue> record) throws 
IOException {
+                       long degree = record.f1.getValue();
+
+                       vertexCount++;
+                       edgeCount += degree;
+                       tripletCount += degree * (degree - 1) / 2;
+               }
+
+               @Override
+               public void close() throws IOException {
+                       getRuntimeContext().addAccumulator(id + "-0", new 
LongCounter(vertexCount));
+                       getRuntimeContext().addAccumulator(id + "-1", new 
LongCounter(edgeCount));
+                       getRuntimeContext().addAccumulator(id + "-2", new 
LongCounter(tripletCount));
+               }
+       }
+
+       /**
+        * Wraps vertex metrics.
+        */
+       public static class Result {
+               private long vertexCount;
+               private long edgeCount;
+               private long tripletCount;
+
+               public Result(long vertexCount, long edgeCount, long 
tripletCount) {
+                       this.vertexCount = vertexCount;
+                       this.edgeCount = edgeCount;
+                       this.tripletCount = tripletCount;
+               }
+
+               /**
+                * Get the number of vertices.
+                *
+                * @return number of vertices
+                */
+               public long getNumberOfVertices() {
+                       return vertexCount;
+               }
+
+               /**
+                * Get the number of edges.
+                *
+                * @return number of edges
+                */
+               public long getNumberOfEdges() {
+                       return edgeCount;
+               }
+
+               /**
+                * Get the number of triplets.
+                *
+                * @return number of triplets
+                */
+               public long getNumberOfTriplets() {
+                       return tripletCount;
+               }
+
+               @Override
+               public String toString() {
+                       return "vertex count: " + vertexCount
+                                       + ", edge count:" + edgeCount
+                                       + ", triplet count: " + tripletCount;
+               }
+
+               @Override
+               public int hashCode() {
+                       return new HashCodeBuilder()
+                               .append(vertexCount)
+                               .append(edgeCount)
+                               .append(tripletCount)
+                               .hashCode();
+               }
+
+               @Override
+               public boolean equals(Object obj) {
+                       if (obj == null) { return false; }
+                       if (obj == this) { return true; }
+                       if (obj.getClass() != getClass()) { return false; }
+
+                       Result rhs = (Result)obj;
+
+                       return new EqualsBuilder()
+                               .append(vertexCount, rhs.vertexCount)
+                               .append(edgeCount, rhs.edgeCount)
+                               .append(tripletCount, rhs.tripletCount)
+                               .isEquals();
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8da1a75c/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/clustering/undirected/GlobalClusteringCoefficientTest.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/clustering/undirected/GlobalClusteringCoefficientTest.java
 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/clustering/undirected/GlobalClusteringCoefficientTest.java
new file mode 100644
index 0000000..71ec2a6
--- /dev/null
+++ 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/clustering/undirected/GlobalClusteringCoefficientTest.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.library.clustering.undirected;
+
+import org.apache.commons.math3.util.CombinatoricsUtils;
+import org.apache.flink.graph.asm.AsmTestBase;
+import 
org.apache.flink.graph.library.clustering.undirected.GlobalClusteringCoefficient.Result;
+import org.apache.flink.types.IntValue;
+import org.apache.flink.types.LongValue;
+import org.apache.flink.types.NullValue;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class GlobalClusteringCoefficientTest
+extends AsmTestBase {
+
+       @Test
+       public void testWithSimpleGraph()
+                       throws Exception {
+               Result expectedResult = new Result(13, 6);
+
+               Result globalClusteringCoefficient = new 
GlobalClusteringCoefficient<IntValue, NullValue, NullValue>()
+                       .run(undirectedSimpleGraph)
+                       .execute();
+
+               assertEquals(expectedResult, globalClusteringCoefficient);
+       }
+
+       @Test
+       public void testWithCompleteGraph()
+                       throws Exception {
+               long expectedDegree = completeGraphVertexCount - 1;
+               long expectedCount = completeGraphVertexCount * 
CombinatoricsUtils.binomialCoefficient((int)expectedDegree, 2);
+
+               Result expectedResult = new Result(expectedCount, 
expectedCount);
+
+               Result globalClusteringCoefficient = new 
GlobalClusteringCoefficient<LongValue, NullValue, NullValue>()
+                       .run(completeGraph)
+                       .execute();
+
+               assertEquals(expectedResult, globalClusteringCoefficient);
+       }
+
+       @Test
+       public void testWithEmptyGraph()
+                       throws Exception {
+               Result expectedResult = new Result(0, 0);
+
+               Result globalClusteringCoefficient = new 
GlobalClusteringCoefficient<LongValue, NullValue, NullValue>()
+                       .run(emptyGraph)
+                       .execute();
+
+               assertEquals(expectedResult, globalClusteringCoefficient);
+       }
+
+       @Test
+       public void testWithRMatGraph()
+                       throws Exception {
+               Result expectedResult = new Result(1003442, 225147);
+
+               Result globalClusteringCoefficient = new 
GlobalClusteringCoefficient<LongValue, NullValue, NullValue>()
+                       .run(undirectedRMatGraph)
+                       .execute();
+
+               assertEquals(expectedResult, globalClusteringCoefficient);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8da1a75c/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficientTest.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficientTest.java
 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficientTest.java
index 414f200..3455df4 100644
--- 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficientTest.java
+++ 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficientTest.java
@@ -30,6 +30,8 @@ import org.apache.flink.types.LongValue;
 import org.apache.flink.types.NullValue;
 import org.junit.Test;
 
+import java.util.List;
+
 import static org.junit.Assert.assertEquals;
 
 public class LocalClusteringCoefficientTest
@@ -61,7 +63,11 @@ extends AsmTestBase {
                DataSet<Result<LongValue>> cc = completeGraph
                        .run(new LocalClusteringCoefficient<LongValue, 
NullValue, NullValue>());
 
-               for (Result<LongValue> result : cc.collect()) {
+               List<Result<LongValue>> results = cc.collect();
+
+               assertEquals(completeGraphVertexCount, results.size());
+
+               for (Result<LongValue> result : results) {
                        assertEquals(expectedDegree, 
result.getDegree().getValue());
                        assertEquals(expectedTriangleCount, 
result.getTriangleCount().getValue());
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/8da1a75c/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/clustering/undirected/TriangleCountTest.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/clustering/undirected/TriangleCountTest.java
 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/clustering/undirected/TriangleCountTest.java
new file mode 100644
index 0000000..6bf9b0d
--- /dev/null
+++ 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/clustering/undirected/TriangleCountTest.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.library.clustering.undirected;
+
+import org.apache.commons.math3.util.CombinatoricsUtils;
+import org.apache.flink.graph.asm.AsmTestBase;
+import org.apache.flink.types.IntValue;
+import org.apache.flink.types.LongValue;
+import org.apache.flink.types.NullValue;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class TriangleCountTest
+extends AsmTestBase {
+
+       @Test
+       public void testWithSimpleGraph()
+                       throws Exception {
+               long triangleCount = new TriangleCount<IntValue, NullValue, 
NullValue>()
+                       .run(undirectedSimpleGraph)
+                       .execute();
+
+               assertEquals(2, triangleCount);
+       }
+
+       @Test
+       public void testWithCompleteGraph()
+                       throws Exception {
+               long expectedDegree = completeGraphVertexCount - 1;
+               long expectedCount = completeGraphVertexCount * 
CombinatoricsUtils.binomialCoefficient((int)expectedDegree, 2) / 3;
+
+               long triangleCount = new TriangleCount<LongValue, NullValue, 
NullValue>()
+                       .run(completeGraph)
+                       .execute();
+
+               assertEquals(expectedCount, triangleCount);
+       }
+
+       @Test
+       public void testWithEmptyGraph()
+                       throws Exception {
+               long triangleCount = new TriangleCount<LongValue, NullValue, 
NullValue>()
+                       .run(emptyGraph)
+                       .execute();
+
+               assertEquals(0, triangleCount);
+       }
+
+       @Test
+       public void testWithRMatGraph()
+                       throws Exception {
+               long triangleCount = new TriangleCount<LongValue, NullValue, 
NullValue>()
+                       .run(undirectedRMatGraph)
+                       .execute();
+
+               assertEquals(75049, triangleCount);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8da1a75c/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/metric/undirected/VertexMetricsTest.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/metric/undirected/VertexMetricsTest.java
 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/metric/undirected/VertexMetricsTest.java
new file mode 100644
index 0000000..a36ca94
--- /dev/null
+++ 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/metric/undirected/VertexMetricsTest.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.library.metric.undirected;
+
+import org.apache.commons.math3.util.CombinatoricsUtils;
+import org.apache.flink.graph.asm.AsmTestBase;
+import org.apache.flink.graph.library.metric.undirected.VertexMetrics.Result;
+import org.apache.flink.types.IntValue;
+import org.apache.flink.types.LongValue;
+import org.apache.flink.types.NullValue;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class VertexMetricsTest
+extends AsmTestBase {
+
+       @Test
+       public void testWithSimpleGraph()
+                       throws Exception {
+               Result expectedResult = new Result(6, 7, 13);
+
+               Result vertexMetrics = new VertexMetrics<IntValue, NullValue, 
NullValue>()
+                       .run(undirectedSimpleGraph)
+                       .execute();
+
+               assertEquals(expectedResult, vertexMetrics);
+       }
+
+       @Test
+       public void testWithCompleteGraph()
+                       throws Exception {
+               long expectedDegree = completeGraphVertexCount - 1;
+               long expectedEdges = completeGraphVertexCount * expectedDegree 
/ 2;
+               long expectedTriplets = completeGraphVertexCount * 
CombinatoricsUtils.binomialCoefficient((int)expectedDegree, 2);
+
+               Result expectedResult = new Result(completeGraphVertexCount, 
expectedEdges, expectedTriplets);
+
+               Result vertexMetrics = new VertexMetrics<LongValue, NullValue, 
NullValue>()
+                       .run(completeGraph)
+                       .execute();
+
+               assertEquals(expectedResult, vertexMetrics);
+       }
+
+       @Test
+       public void testWithEmptyGraph()
+                       throws Exception {
+               Result expectedResult;
+
+               expectedResult = new Result(0, 0, 0);
+
+               Result withoutZeroDegreeVertices = new VertexMetrics<LongValue, 
NullValue, NullValue>()
+                       .setIncludeZeroDegreeVertices(false)
+                       .run(emptyGraph)
+                       .execute();
+
+               assertEquals(withoutZeroDegreeVertices, expectedResult);
+
+               expectedResult = new Result(3, 0, 0);
+
+               Result withZeroDegreeVertices = new VertexMetrics<LongValue, 
NullValue, NullValue>()
+                       .setIncludeZeroDegreeVertices(true)
+                       .run(emptyGraph)
+                       .execute();
+
+               assertEquals(expectedResult, withZeroDegreeVertices);
+       }
+
+       @Test
+       public void testWithRMatGraph()
+                       throws Exception {
+               Result expectedResult = new Result(902, 10442, 1003442);
+
+               Result withoutZeroDegreeVertices = new VertexMetrics<LongValue, 
NullValue, NullValue>()
+                       .run(undirectedRMatGraph)
+                       .execute();
+
+               assertEquals(expectedResult, withoutZeroDegreeVertices);
+       }
+}

Reply via email to