[gelly] [refactoring] Removed Example end string from all gelly examples

Added Algorithm end string to the library methods

This closes #625


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8f4039dc
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8f4039dc
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8f4039dc

Branch: refs/heads/master
Commit: 8f4039dcb326a1af276ac4b93ffe5ea4da3e19bc
Parents: 1479973
Author: andralungu <lungu.an...@gmail.com>
Authored: Sun Apr 26 20:09:04 2015 +0200
Committer: vasia <va...@apache.org>
Committed: Tue May 19 22:38:03 2015 +0200

----------------------------------------------------------------------
 .../flink/graph/example/CommunityDetection.java | 140 ++++++++++++
 .../graph/example/ConnectedComponents.java      | 141 ++++++++++++
 .../example/ConnectedComponentsExample.java     | 141 ------------
 .../graph/example/EuclideanGraphExample.java    | 210 ------------------
 .../graph/example/EuclideanGraphWeighing.java   | 210 ++++++++++++++++++
 .../graph/example/GSAConnectedComponents.java   | 176 +++++++++++++++
 .../example/GSAConnectedComponentsExample.java  | 176 ---------------
 .../example/GSASingleSourceShortestPaths.java   | 178 +++++++++++++++
 .../GSASingleSourceShortestPathsExample.java    | 178 ---------------
 .../graph/example/JaccardSimilarityMeasure.java | 214 +++++++++++++++++++
 .../JaccardSimilarityMeasureExample.java        | 214 -------------------
 .../flink/graph/example/LabelPropagation.java   | 170 +++++++++++++++
 .../graph/example/LabelPropagationExample.java  | 170 ---------------
 .../flink/graph/example/MusicProfiles.java      |   4 +-
 .../apache/flink/graph/example/PageRank.java    | 149 +++++++++++++
 .../flink/graph/example/PageRankExample.java    | 149 -------------
 .../SimpleCommunityDetectionExample.java        | 140 ------------
 .../example/SingleSourceShortestPaths.java      | 133 ++++++++++++
 .../SingleSourceShortestPathsExample.java       | 133 ------------
 .../example/utils/CommunityDetectionData.java   |  65 ++++++
 .../utils/ConnectedComponentsDefaultData.java   |  52 +++++
 .../utils/ConnectedComponentsExampleData.java   |  52 -----
 .../utils/EdgeWithLongIdNullValueParser.java    |  33 ---
 .../graph/example/utils/EuclideanGraphData.java |  10 +-
 .../utils/SimpleCommunityDetectionData.java     |  65 ------
 .../utils/SingleSourceShortestPathsData.java    |   4 +
 .../library/CommunityDetectionAlgorithm.java    | 172 +++++++++++++++
 .../graph/library/ConnectedComponents.java      |  88 --------
 .../library/ConnectedComponentsAlgorithm.java   |  88 ++++++++
 .../flink/graph/library/LabelPropagation.java   | 111 ----------
 .../library/LabelPropagationAlgorithm.java      | 114 ++++++++++
 .../apache/flink/graph/library/PageRank.java    | 100 ---------
 .../flink/graph/library/PageRankAlgorithm.java  | 104 +++++++++
 .../graph/library/SimpleCommunityDetection.java | 172 ---------------
 .../library/SingleSourceShortestPaths.java      | 108 ----------
 .../SingleSourceShortestPathsAlgorithm.java     | 111 ++++++++++
 .../flink/graph/test/GatherSumApplyITCase.java  |  10 +-
 .../test/example/CommunityDetectionITCase.java  | 100 +++++++++
 .../test/example/ConnectedComponentsITCase.java |  10 +-
 ...ctedComponentsWithRandomisedEdgesITCase.java |   4 +-
 .../example/EuclideanGraphExampleITCase.java    |  77 -------
 .../example/EuclideanGraphWeighingITCase.java   |  77 +++++++
 .../JaccardSimilarityMeasureExampleITCase.java  |  72 -------
 .../example/JaccardSimilarityMeasureITCase.java |  72 +++++++
 .../example/LabelPropagationExampleITCase.java  | 143 -------------
 .../test/example/LabelPropagationITCase.java    | 143 +++++++++++++
 .../example/SimpleCommunityDetectionITCase.java | 100 ---------
 .../SingleSourceShortestPathsITCase.java        |   4 +-
 48 files changed, 2634 insertions(+), 2653 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/8f4039dc/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/CommunityDetection.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/CommunityDetection.java
 
b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/CommunityDetection.java
new file mode 100644
index 0000000..f9434d3
--- /dev/null
+++ 
b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/CommunityDetection.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.example;
+
+import org.apache.flink.api.common.ProgramDescription;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.example.utils.CommunityDetectionData;
+import org.apache.flink.graph.library.CommunityDetectionAlgorithm;
+import org.apache.flink.graph.utils.Tuple3ToEdgeMap;
+
+/**
+ * This example shows how to use the {@link 
org.apache.flink.graph.library.CommunityDetectionAlgorithm}
+ * library method:
+ * <ul>
+ *     <li> with the edge data set given as a parameter
+ *     <li> with default data
+ * </ul>
+ *
+ * The input file is a plain text file and must be formatted as follows:
+ * Edges are represented by tuples of srcVertexId, trgVertexId, weight which 
are
+ * separated by tabs. Edges themselves are separated by newlines.
+ * For example: <code>1\t2\t1.0\n1\t3\t2.0\n</code> defines two edges,
+ * 1-2 with weight 1.0 and 1-3 with weight 2.0.
+ *
+ * Usage <code>CommunityDetection &lt;edge path&gt; &lt;result path&gt;
+ * &lt;number of iterations&gt; &lt;delta&gt;</code><br>
+ * If no parameters are provided, the program is run with default data from
+ * {@link org.apache.flink.graph.example.utils.CommunityDetectionData}
+ */
+public class CommunityDetection implements ProgramDescription {
+
+       @SuppressWarnings("serial")
+       public static void main(String [] args) throws Exception {
+
+               if(!parseParameters(args)) {
+                       return;
+               }
+
+               // set up the execution environment
+               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               // set up the graph
+               DataSet<Edge<Long, Double>> edges = getEdgesDataSet(env);
+               Graph<Long, Long, Double> graph = Graph.fromDataSet(edges,
+                               new MapFunction<Long, Long>() {
+                                       @Override
+                                       public Long map(Long label) throws 
Exception {
+                                               return label;
+                                       }
+                               }, env);
+
+               // the result is in the form of <vertexId, communityId>, where 
the communityId is the label
+               // which the vertex converged to
+               DataSet<Vertex<Long, Long>> communityVertices =
+                               graph.run(new 
CommunityDetectionAlgorithm(maxIterations, delta)).getVertices();
+
+               // emit result
+               if (fileOutput) {
+                       communityVertices.writeAsCsv(outputPath, "\n", ",");
+               } else {
+                       communityVertices.print();
+               }
+
+               env.execute("Executing Community Detection Example");
+       }
+
+       @Override
+       public String getDescription() {
+               return "Community Detection";
+       }
+
+       // 
*************************************************************************
+       // UTIL METHODS
+       // 
*************************************************************************
+
+       private static boolean fileOutput = false;
+       private static String edgeInputPath = null;
+       private static String outputPath = null;
+       private static Integer maxIterations = 
CommunityDetectionData.MAX_ITERATIONS;
+       private static Double delta = CommunityDetectionData.DELTA;
+
+       private static boolean parseParameters(String [] args) {
+               if(args.length > 0) {
+                       if(args.length != 4) {
+                               System.err.println("Usage CommunityDetection 
<edge path> <output path> " +
+                                               "<num iterations> <delta>");
+                               return false;
+                       }
+
+                       fileOutput = true;
+                       edgeInputPath = args[0];
+                       outputPath = args[1];
+                       maxIterations = Integer.parseInt(args[2]);
+                       delta = Double.parseDouble(args[3]);
+
+               } else {
+                       System.out.println("Executing SimpleCommunityDetection 
example with default parameters and built-in default data.");
+                       System.out.println("Provide parameters to read input 
data from files.");
+                       System.out.println("Usage CommunityDetection <edge 
path> <output path> " +
+                                       "<num iterations> <delta>");
+               }
+
+               return true;
+       }
+
+       private static DataSet<Edge<Long, Double>> 
getEdgesDataSet(ExecutionEnvironment env) {
+
+               if(fileOutput) {
+                       return env.readCsvFile(edgeInputPath)
+                                       .ignoreComments("#")
+                                       .fieldDelimiter("\t")
+                                       .lineDelimiter("\n")
+                                       .types(Long.class, Long.class, 
Double.class)
+                                       .map(new Tuple3ToEdgeMap<Long, 
Double>());
+               } else {
+                       return 
CommunityDetectionData.getDefaultEdgeDataSet(env);
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8f4039dc/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/ConnectedComponents.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/ConnectedComponents.java
 
b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/ConnectedComponents.java
new file mode 100644
index 0000000..b7c9045
--- /dev/null
+++ 
b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/ConnectedComponents.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.example;
+
+import org.apache.flink.api.common.ProgramDescription;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.example.utils.ConnectedComponentsDefaultData;
+import org.apache.flink.graph.library.ConnectedComponentsAlgorithm;
+import org.apache.flink.types.NullValue;
+
+/**
+ * This example shows how to use the {@link 
org.apache.flink.graph.library.ConnectedComponentsAlgorithm}
+ * library method:
+ * <ul>
+ *     <li> with the edge data set given as a parameter
+ *     <li> with default data
+ * </ul>
+ *
+ * The input file is a plain text file and must be formatted as follows:
+ * Edges are represented by tuples of srcVertexId, trgVertexId which are
+ * separated by tabs. Edges themselves are separated by newlines.
+ * For example: <code>1\t2\n1\t3\n</code> defines two edges,
+ * 1-2 with and 1-3.
+ *
+ * Usage <code>ConnectedComponents &lt;edge path&gt; &lt;result path&gt;
+ * &lt;number of iterations&gt; </code><br>
+ * If no parameters are provided, the program is run with default data from
+ * {@link org.apache.flink.graph.example.utils.ConnectedComponentsDefaultData}
+ */
+public class ConnectedComponents implements ProgramDescription {
+
+       @SuppressWarnings("serial")
+       public static void main(String [] args) throws Exception {
+
+               if(!parseParameters(args)) {
+                       return;
+               }
+
+               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               DataSet<Edge<Long, NullValue>> edges = getEdgesDataSet(env);
+
+               Graph<Long, Long, NullValue> graph = Graph.fromDataSet(edges, 
new MapFunction<Long, Long>() {
+                       @Override
+                       public Long map(Long value) throws Exception {
+                               return value;
+                       }
+               }, env);
+
+               DataSet<Vertex<Long, Long>> verticesWithMinIds = graph
+                               .run(new 
ConnectedComponentsAlgorithm(maxIterations)).getVertices();
+
+               // emit result
+               if (fileOutput) {
+                       verticesWithMinIds.writeAsCsv(outputPath, "\n", ",");
+               } else {
+                       verticesWithMinIds.print();
+               }
+
+               env.execute("Connected Components Example");
+       }
+
+       @Override
+       public String getDescription() {
+               return "Connected Components Example";
+       }
+
+       // 
*************************************************************************
+       // UTIL METHODS
+       // 
*************************************************************************
+
+       private static boolean fileOutput = false;
+       private static String edgeInputPath = null;
+       private static String outputPath = null;
+       private static Integer maxIterations = 
ConnectedComponentsDefaultData.MAX_ITERATIONS;
+
+       private static boolean parseParameters(String [] args) {
+               if(args.length > 0) {
+                       if(args.length != 3) {
+                               System.err.println("Usage ConnectedComponents 
<edge path> <output path> " +
+                                               "<num iterations>");
+                               return false;
+                       }
+
+                       fileOutput = true;
+                       edgeInputPath = args[0];
+                       outputPath = args[1];
+                       maxIterations = Integer.parseInt(args[2]);
+
+               } else {
+                       System.out.println("Executing ConnectedComponents 
example with default parameters and built-in default data.");
+                       System.out.println("Provide parameters to read input 
data from files.");
+                       System.out.println("Usage ConnectedComponents <edge 
path> <output path> " +
+                                       "<num iterations>");
+               }
+
+               return true;
+       }
+
+       @SuppressWarnings("serial")
+       private static DataSet<Edge<Long, NullValue>> 
getEdgesDataSet(ExecutionEnvironment env) {
+
+               if(fileOutput) {
+                       return env.readCsvFile(edgeInputPath)
+                                       .ignoreComments("#")
+                                       .fieldDelimiter("\t")
+                                       .lineDelimiter("\n")
+                                       .types(Long.class, Long.class)
+                                       .map(new MapFunction<Tuple2<Long, 
Long>, Edge<Long, NullValue>>() {
+                                               @Override
+                                               public Edge<Long, NullValue> 
map(Tuple2<Long, Long> value) throws Exception {
+                                                       return new Edge<Long, 
NullValue>(value.f0, value.f1, NullValue.getInstance());
+                                               }
+                                       });
+               } else {
+                       return 
ConnectedComponentsDefaultData.getDefaultEdgeDataSet(env);
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8f4039dc/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/ConnectedComponentsExample.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/ConnectedComponentsExample.java
 
b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/ConnectedComponentsExample.java
deleted file mode 100644
index a185a70..0000000
--- 
a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/ConnectedComponentsExample.java
+++ /dev/null
@@ -1,141 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.example;
-
-import org.apache.flink.api.common.ProgramDescription;
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.graph.Edge;
-import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.Vertex;
-import org.apache.flink.graph.example.utils.ConnectedComponentsExampleData;
-import org.apache.flink.graph.library.ConnectedComponents;
-import org.apache.flink.types.NullValue;
-
-/**
- * This example shows how to use the {@link 
org.apache.flink.graph.library.ConnectedComponents}
- * library method:
- * <ul>
- *     <li> with the edge data set given as a parameter
- *     <li> with default data
- * </ul>
- *
- * The input file is a plain text file and must be formatted as follows:
- * Edges are represented by tuples of srcVertexId, trgVertexId which are
- * separated by tabs. Edges themselves are separated by newlines.
- * For example: <code>1\t2\n1\t3\n</code> defines two edges,
- * 1-2 with and 1-3.
- *
- * Usage <code>ConnectedComponents &lt;edge path&gt; &lt;result path&gt;
- * &lt;number of iterations&gt; </code><br>
- * If no parameters are provided, the program is run with default data from
- * {@link org.apache.flink.graph.example.utils.ConnectedComponentsExampleData}
- */
-public class ConnectedComponentsExample implements ProgramDescription {
-
-       @SuppressWarnings("serial")
-       public static void main(String [] args) throws Exception {
-
-               if(!parseParameters(args)) {
-                       return;
-               }
-
-               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-
-               DataSet<Edge<Long, NullValue>> edges = getEdgesDataSet(env);
-
-               Graph<Long, Long, NullValue> graph = Graph.fromDataSet(edges, 
new MapFunction<Long, Long>() {
-                       @Override
-                       public Long map(Long value) throws Exception {
-                               return value;
-                       }
-               }, env);
-
-               DataSet<Vertex<Long, Long>> verticesWithMinIds = graph
-                               .run(new 
ConnectedComponents(maxIterations)).getVertices();
-
-               // emit result
-               if (fileOutput) {
-                       verticesWithMinIds.writeAsCsv(outputPath, "\n", ",");
-               } else {
-                       verticesWithMinIds.print();
-               }
-
-               env.execute("Connected Components Example");
-       }
-
-       @Override
-       public String getDescription() {
-               return "Connected Components Example";
-       }
-
-       // 
*************************************************************************
-       // UTIL METHODS
-       // 
*************************************************************************
-
-       private static boolean fileOutput = false;
-       private static String edgeInputPath = null;
-       private static String outputPath = null;
-       private static Integer maxIterations = 
ConnectedComponentsExampleData.MAX_ITERATIONS;
-
-       private static boolean parseParameters(String [] args) {
-               if(args.length > 0) {
-                       if(args.length != 3) {
-                               System.err.println("Usage ConnectedComponents 
<edge path> <output path> " +
-                                               "<num iterations>");
-                               return false;
-                       }
-
-                       fileOutput = true;
-                       edgeInputPath = args[0];
-                       outputPath = args[1];
-                       maxIterations = Integer.parseInt(args[2]);
-
-               } else {
-                       System.out.println("Executing ConnectedComponents 
example with default parameters and built-in default data.");
-                       System.out.println("Provide parameters to read input 
data from files.");
-                       System.out.println("Usage ConnectedComponents <edge 
path> <output path> " +
-                                       "<num iterations>");
-               }
-
-               return true;
-       }
-
-       @SuppressWarnings("serial")
-       private static DataSet<Edge<Long, NullValue>> 
getEdgesDataSet(ExecutionEnvironment env) {
-
-               if(fileOutput) {
-                       return env.readCsvFile(edgeInputPath)
-                                       .ignoreComments("#")
-                                       .fieldDelimiter("\t")
-                                       .lineDelimiter("\n")
-                                       .types(Long.class, Long.class)
-                                       .map(new MapFunction<Tuple2<Long, 
Long>, Edge<Long, NullValue>>() {
-                                               @Override
-                                               public Edge<Long, NullValue> 
map(Tuple2<Long, Long> value) throws Exception {
-                                                       return new Edge<Long, 
NullValue>(value.f0, value.f1, NullValue.getInstance());
-                                               }
-                                       });
-               } else {
-                       return 
ConnectedComponentsExampleData.getDefaultEdgeDataSet(env);
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/8f4039dc/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/EuclideanGraphExample.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/EuclideanGraphExample.java
 
b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/EuclideanGraphExample.java
deleted file mode 100644
index fa08084..0000000
--- 
a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/EuclideanGraphExample.java
+++ /dev/null
@@ -1,210 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.example;
-
-import org.apache.flink.api.common.ProgramDescription;
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.graph.Edge;
-import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.Triplet;
-import org.apache.flink.graph.Vertex;
-import org.apache.flink.graph.example.utils.EuclideanGraphData;
-
-import java.io.Serializable;
-
-/**
- * Given a directed, unweighted graph, with vertex values representing points 
in a plan,
- * return a weighted graph where the edge weights are equal to the Euclidean 
distance between the
- * src and the trg vertex values.
- *
- * <p>
- * Input files are plain text files and must be formatted as follows:
- * <ul>
- *     <li> Vertices are represented by their vertexIds and vertex values and 
are separated by newlines,
- *     the value being formed of two doubles separated by a comma.
- *     For example: <code>1,1.0,1.0\n2,2.0,2.0\n3,3.0,3.0\n</code> defines a 
data set of three vertices
- *     <li> Edges are represented by pairs of srcVertexId, trgVertexId 
separated by commas.
- *     Edges themselves are separated by newlines.
- *     For example: <code>1,2\n1,3\n</code> defines two edges 1-2 and 1-3.
- * </ul>
- * </p>
- *
- * Usage <code>EuclideanGraphExample &lt;vertex path&gt; &lt;edge path&gt; 
&lt;result path&gt;</code><br>
- * If no parameters are provided, the program is run with default data from
- * {@link org.apache.flink.graph.example.utils.EuclideanGraphData}
- */
-@SuppressWarnings("serial")
-public class EuclideanGraphExample implements ProgramDescription {
-
-       public static void main(String[] args) throws Exception {
-
-               if (!parseParameters(args)) {
-                       return;
-               }
-
-               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-
-               DataSet<Vertex<Long, Point>> vertices = getVerticesDataSet(env);
-
-               DataSet<Edge<Long, Double>> edges = getEdgesDataSet(env);
-
-               Graph<Long, Point, Double> graph = Graph.fromDataSet(vertices, 
edges, env);
-
-               // the edge value will be the Euclidean distance between its 
src and trg vertex
-               DataSet<Tuple3<Long, Long, Double>> edgesWithEuclideanWeight = 
graph.getTriplets()
-                               .map(new MapFunction<Triplet<Long, Point, 
Double>, Tuple3<Long, Long, Double>>() {
-
-                                       @Override
-                                       public Tuple3<Long, Long, Double> 
map(Triplet<Long, Point, Double> triplet)
-                                                       throws Exception {
-
-                                               Vertex<Long, Point> srcVertex = 
triplet.getSrcVertex();
-                                               Vertex<Long, Point> trgVertex = 
triplet.getTrgVertex();
-
-                                               return new Tuple3<Long, Long, 
Double>(srcVertex.getId(), trgVertex.getId(),
-                                                               
srcVertex.getValue().euclideanDistance(trgVertex.getValue()));
-                                       }
-                               });
-
-               Graph<Long, Point, Double> resultedGraph = 
graph.joinWithEdges(edgesWithEuclideanWeight,
-                               new MapFunction<Tuple2<Double, Double>, 
Double>() {
-
-                                       @Override
-                                       public Double map(Tuple2<Double, 
Double> distance) throws Exception {
-                                               return distance.f1;
-                                       }
-                               });
-
-               // retrieve the edges from the final result
-               DataSet<Edge<Long, Double>> result = resultedGraph.getEdges();
-
-               // emit result
-               if (fileOutput) {
-                       result.writeAsCsv(outputPath, "\n", ",");
-               } else {
-                       result.print();
-               }
-
-               env.execute("Euclidean Graph Example");
-       }
-
-       @Override
-       public String getDescription() {
-               return "Weighing a graph by computing the Euclidean distance " +
-                               "between its vertices";
-       }
-
-       // 
*************************************************************************
-       //     DATA TYPES
-       // 
*************************************************************************
-
-       /**
-        * A simple two-dimensional point.
-        */
-       public static class Point implements Serializable {
-
-               public double x, y;
-
-               public Point() {}
-
-               public Point(double x, double y) {
-                       this.x = x;
-                       this.y = y;
-               }
-
-               public double euclideanDistance(Point other) {
-                       return Math.sqrt((x-other.x)*(x-other.x) + 
(y-other.y)*(y-other.y));
-               }
-
-               @Override
-               public String toString() {
-                       return x + " " + y;
-               }
-       }
-
-       // 
******************************************************************************************************************
-       // UTIL METHODS
-       // 
******************************************************************************************************************
-
-       private static boolean fileOutput = false;
-
-       private static String verticesInputPath = null;
-
-       private static String edgesInputPath = null;
-
-       private static String outputPath = null;
-
-       private static boolean parseParameters(String[] args) {
-
-               if (args.length > 0) {
-                       if (args.length == 3) {
-                               fileOutput = true;
-                               verticesInputPath = args[0];
-                               edgesInputPath = args[1];
-                               outputPath = args[2];
-                       } else {
-                               System.out.println("Executing Euclidean Graph 
example with default parameters and built-in default data.");
-                               System.out.println("Provide parameters to read 
input data from files.");
-                               System.out.println("See the documentation for 
the correct format of input files.");
-                               System.err.println("Usage: 
EuclideanGraphExample <input vertices path> <input edges path>" +
-                                               " <output path>");
-                               return false;
-                       }
-               }
-               return true;
-       }
-
-       private static DataSet<Vertex<Long, Point>> 
getVerticesDataSet(ExecutionEnvironment env) {
-               if (fileOutput) {
-                       return env.readCsvFile(verticesInputPath)
-                                       .lineDelimiter("\n")
-                                       .types(Long.class, Double.class, 
Double.class)
-                                       .map(new MapFunction<Tuple3<Long, 
Double, Double>, Vertex<Long, Point>>() {
-
-                                               @Override
-                                               public Vertex<Long, Point> 
map(Tuple3<Long, Double, Double> value) throws Exception {
-                                                       return new Vertex<Long, 
Point>(value.f0, new Point(value.f1, value.f2));
-                                               }
-                                       });
-               } else {
-                       return EuclideanGraphData.getDefaultVertexDataSet(env);
-               }
-       }
-
-       private static DataSet<Edge<Long, Double>> 
getEdgesDataSet(ExecutionEnvironment env) {
-               if (fileOutput) {
-                       return env.readCsvFile(edgesInputPath)
-                                       .lineDelimiter("\n")
-                                       .types(Long.class, Long.class)
-                                       .map(new MapFunction<Tuple2<Long, 
Long>, Edge<Long, Double>>() {
-
-                                               @Override
-                                               public Edge<Long, Double> 
map(Tuple2<Long, Long> tuple2) throws Exception {
-                                                       return new Edge<Long, 
Double>(tuple2.f0, tuple2.f1, 0.0);
-                                               }
-                                       });
-               } else {
-                       return EuclideanGraphData.getDefaultEdgeDataSet(env);
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/8f4039dc/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/EuclideanGraphWeighing.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/EuclideanGraphWeighing.java
 
b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/EuclideanGraphWeighing.java
new file mode 100644
index 0000000..565ef69
--- /dev/null
+++ 
b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/EuclideanGraphWeighing.java
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.example;
+
+import org.apache.flink.api.common.ProgramDescription;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.Triplet;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.example.utils.EuclideanGraphData;
+
+import java.io.Serializable;
+
+/**
+ * Given a directed, unweighted graph, with vertex values representing points 
in a plan,
+ * return a weighted graph where the edge weights are equal to the Euclidean 
distance between the
+ * src and the trg vertex values.
+ *
+ * <p>
+ * Input files are plain text files and must be formatted as follows:
+ * <ul>
+ *     <li> Vertices are represented by their vertexIds and vertex values and 
are separated by newlines,
+ *     the value being formed of two doubles separated by a comma.
+ *     For example: <code>1,1.0,1.0\n2,2.0,2.0\n3,3.0,3.0\n</code> defines a 
data set of three vertices
+ *     <li> Edges are represented by pairs of srcVertexId, trgVertexId 
separated by commas.
+ *     Edges themselves are separated by newlines.
+ *     For example: <code>1,2\n1,3\n</code> defines two edges 1-2 and 1-3.
+ * </ul>
+ * </p>
+ *
+ * Usage <code>EuclideanGraphWeighing &lt;vertex path&gt; &lt;edge path&gt; 
&lt;result path&gt;</code><br>
+ * If no parameters are provided, the program is run with default data from
+ * {@link org.apache.flink.graph.example.utils.EuclideanGraphData}
+ */
+@SuppressWarnings("serial")
+public class EuclideanGraphWeighing implements ProgramDescription {
+
+       public static void main(String[] args) throws Exception {
+
+               if (!parseParameters(args)) {
+                       return;
+               }
+
+               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               DataSet<Vertex<Long, Point>> vertices = getVerticesDataSet(env);
+
+               DataSet<Edge<Long, Double>> edges = getEdgesDataSet(env);
+
+               Graph<Long, Point, Double> graph = Graph.fromDataSet(vertices, 
edges, env);
+
+               // the edge value will be the Euclidean distance between its 
src and trg vertex
+               DataSet<Tuple3<Long, Long, Double>> edgesWithEuclideanWeight = 
graph.getTriplets()
+                               .map(new MapFunction<Triplet<Long, Point, 
Double>, Tuple3<Long, Long, Double>>() {
+
+                                       @Override
+                                       public Tuple3<Long, Long, Double> 
map(Triplet<Long, Point, Double> triplet)
+                                                       throws Exception {
+
+                                               Vertex<Long, Point> srcVertex = 
triplet.getSrcVertex();
+                                               Vertex<Long, Point> trgVertex = 
triplet.getTrgVertex();
+
+                                               return new Tuple3<Long, Long, 
Double>(srcVertex.getId(), trgVertex.getId(),
+                                                               
srcVertex.getValue().euclideanDistance(trgVertex.getValue()));
+                                       }
+                               });
+
+               Graph<Long, Point, Double> resultedGraph = 
graph.joinWithEdges(edgesWithEuclideanWeight,
+                               new MapFunction<Tuple2<Double, Double>, 
Double>() {
+
+                                       @Override
+                                       public Double map(Tuple2<Double, 
Double> distance) throws Exception {
+                                               return distance.f1;
+                                       }
+                               });
+
+               // retrieve the edges from the final result
+               DataSet<Edge<Long, Double>> result = resultedGraph.getEdges();
+
+               // emit result
+               if (fileOutput) {
+                       result.writeAsCsv(outputPath, "\n", ",");
+               } else {
+                       result.print();
+               }
+
+               env.execute("Euclidean Graph Weighing Example");
+       }
+
+       @Override
+       public String getDescription() {
+               return "Weighing a graph by computing the Euclidean distance " +
+                               "between its vertices";
+       }
+
+       // 
*************************************************************************
+       //     DATA TYPES
+       // 
*************************************************************************
+
+       /**
+        * A simple two-dimensional point.
+        */
+       public static class Point implements Serializable {
+
+               public double x, y;
+
+               public Point() {}
+
+               public Point(double x, double y) {
+                       this.x = x;
+                       this.y = y;
+               }
+
+               public double euclideanDistance(Point other) {
+                       return Math.sqrt((x-other.x)*(x-other.x) + 
(y-other.y)*(y-other.y));
+               }
+
+               @Override
+               public String toString() {
+                       return x + " " + y;
+               }
+       }
+
+       // 
******************************************************************************************************************
+       // UTIL METHODS
+       // 
******************************************************************************************************************
+
+       private static boolean fileOutput = false;
+
+       private static String verticesInputPath = null;
+
+       private static String edgesInputPath = null;
+
+       private static String outputPath = null;
+
+       private static boolean parseParameters(String[] args) {
+
+               if (args.length > 0) {
+                       if (args.length == 3) {
+                               fileOutput = true;
+                               verticesInputPath = args[0];
+                               edgesInputPath = args[1];
+                               outputPath = args[2];
+                       } else {
+                               System.out.println("Executing Euclidean Graph 
Weighing example with default parameters and built-in default data.");
+                               System.out.println("Provide parameters to read 
input data from files.");
+                               System.out.println("See the documentation for 
the correct format of input files.");
+                               System.err.println("Usage: 
EuclideanGraphWeighing <input vertices path> <input edges path>" +
+                                               " <output path>");
+                               return false;
+                       }
+               }
+               return true;
+       }
+
+       private static DataSet<Vertex<Long, Point>> 
getVerticesDataSet(ExecutionEnvironment env) {
+               if (fileOutput) {
+                       return env.readCsvFile(verticesInputPath)
+                                       .lineDelimiter("\n")
+                                       .types(Long.class, Double.class, 
Double.class)
+                                       .map(new MapFunction<Tuple3<Long, 
Double, Double>, Vertex<Long, Point>>() {
+
+                                               @Override
+                                               public Vertex<Long, Point> 
map(Tuple3<Long, Double, Double> value) throws Exception {
+                                                       return new Vertex<Long, 
Point>(value.f0, new Point(value.f1, value.f2));
+                                               }
+                                       });
+               } else {
+                       return EuclideanGraphData.getDefaultVertexDataSet(env);
+               }
+       }
+
+       private static DataSet<Edge<Long, Double>> 
getEdgesDataSet(ExecutionEnvironment env) {
+               if (fileOutput) {
+                       return env.readCsvFile(edgesInputPath)
+                                       .lineDelimiter("\n")
+                                       .types(Long.class, Long.class)
+                                       .map(new MapFunction<Tuple2<Long, 
Long>, Edge<Long, Double>>() {
+
+                                               @Override
+                                               public Edge<Long, Double> 
map(Tuple2<Long, Long> tuple2) throws Exception {
+                                                       return new Edge<Long, 
Double>(tuple2.f0, tuple2.f1, 0.0);
+                                               }
+                                       });
+               } else {
+                       return EuclideanGraphData.getDefaultEdgeDataSet(env);
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8f4039dc/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSAConnectedComponents.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSAConnectedComponents.java
 
b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSAConnectedComponents.java
new file mode 100755
index 0000000..30855e4
--- /dev/null
+++ 
b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSAConnectedComponents.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.example;
+
+import org.apache.flink.api.common.ProgramDescription;
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.gsa.ApplyFunction;
+import org.apache.flink.graph.gsa.GatherFunction;
+import org.apache.flink.graph.gsa.SumFunction;
+import org.apache.flink.graph.gsa.Neighbor;
+import org.apache.flink.types.NullValue;
+import org.apache.flink.util.Collector;
+
+/**
+ * This is an implementation of the Connected Components algorithm, using a 
gather-sum-apply iteration
+ */
+public class GSAConnectedComponents implements ProgramDescription {
+
+       // 
--------------------------------------------------------------------------------------------
+       //  Program
+       // 
--------------------------------------------------------------------------------------------
+
+       public static void main(String[] args) throws Exception {
+
+               if (!parseParameters(args)) {
+                       return;
+               }
+
+               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               DataSet<Edge<Long, NullValue>> edges = getEdgeDataSet(env);
+
+               Graph<Long, Long, NullValue> graph = Graph.fromDataSet(edges, 
new InitVertices(), env);
+
+               // Execute the GSA iteration
+               Graph<Long, Long, NullValue> result =
+                               graph.runGatherSumApplyIteration(new 
GatherNeighborIds(), new SelectMinId(),
+                                               new UpdateComponentId(), 
maxIterations);
+
+               // Extract the vertices as the result
+               DataSet<Vertex<Long, Long>> connectedComponents = 
result.getVertices();
+
+               // emit result
+               if (fileOutput) {
+                       connectedComponents.writeAsCsv(outputPath, "\n", " ");
+               } else {
+                       connectedComponents.print();
+               }
+
+               env.execute("GSA Connected Components");
+       }
+
+       @SuppressWarnings("serial")
+       private static final class InitVertices implements MapFunction<Long, 
Long> {
+
+               public Long map(Long vertexId) {
+                       return vertexId;
+               }
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+       //  Connected Components UDFs
+       // 
--------------------------------------------------------------------------------------------
+
+       @SuppressWarnings("serial")
+       private static final class GatherNeighborIds extends 
GatherFunction<Long, NullValue, Long> {
+
+               public Long gather(Neighbor<Long, NullValue> neighbor) {
+                       return neighbor.getNeighborValue();
+               }
+       };
+
+       @SuppressWarnings("serial")
+       private static final class SelectMinId extends SumFunction<Long, 
NullValue, Long> {
+
+               public Long sum(Long newValue, Long currentValue) {
+                       return Math.min(newValue, currentValue);
+               }
+       };
+
+       @SuppressWarnings("serial")
+       private static final class UpdateComponentId extends 
ApplyFunction<Long, Long, Long> {
+
+               public void apply(Long summedValue, Long origValue) {
+                       if (summedValue < origValue) {
+                               setResult(summedValue);
+                       }
+               }
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+       //  Util methods
+       // 
--------------------------------------------------------------------------------------------
+
+       private static boolean fileOutput = false;
+       private static String edgeInputPath = null;
+       private static String outputPath = null;
+
+       private static int maxIterations = 16;
+
+       private static boolean parseParameters(String[] args) {
+
+               if (args.length > 0) {
+                       // parse input arguments
+                       fileOutput = true;
+
+                       if (args.length != 3) {
+                               System.err.println("Usage: 
GSAConnectedComponents <edge path> " +
+                                               "<result path> <max 
iterations>");
+                               return false;
+                       }
+
+                       edgeInputPath = args[0];
+                       outputPath = args[1];
+                       maxIterations = Integer.parseInt(args[2]);
+               } else {
+                       System.out.println("Executing GSA Connected Components 
example with built-in default data.");
+                       System.out.println("  Provide parameters to read input 
data from files.");
+                       System.out.println("  See the documentation for the 
correct format of input files.");
+                       System.out.println("  Usage: GSAConnectedComponents 
<edge path> <result path> <max iterations>");
+               }
+               return true;
+       }
+
+       @SuppressWarnings("serial")
+       private static DataSet<Edge<Long, NullValue>> 
getEdgeDataSet(ExecutionEnvironment env) {
+               if (fileOutput) {
+                       return env.readCsvFile(edgeInputPath)
+                                       .fieldDelimiter("\t")
+                                       .lineDelimiter("\n")
+                                       .types(Long.class, Long.class)
+                                       .map(new MapFunction<Tuple2<Long, 
Long>, Edge<Long, NullValue>>() {
+
+                                               public Edge<Long, NullValue> 
map(Tuple2<Long, Long> value) throws Exception {
+                                                       return new Edge<Long, 
NullValue>(value.f0, value.f1, NullValue.getInstance());
+                                               }
+                                       });
+               }
+
+               // Generates 3 components of size 2
+               return env.generateSequence(0, 2).flatMap(new 
FlatMapFunction<Long, Edge<Long, NullValue>>() {
+                       @Override
+                       public void flatMap(Long value, Collector<Edge<Long, 
NullValue>> out) throws Exception {
+                               out.collect(new Edge<Long, NullValue>(value, 
value + 3, NullValue.getInstance()));
+                       }
+               });
+       }
+
+       @Override
+       public String getDescription() {
+               return "GSA Connected Components";
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8f4039dc/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSAConnectedComponentsExample.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSAConnectedComponentsExample.java
 
b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSAConnectedComponentsExample.java
deleted file mode 100755
index 7c39123..0000000
--- 
a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSAConnectedComponentsExample.java
+++ /dev/null
@@ -1,176 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.example;
-
-import org.apache.flink.api.common.ProgramDescription;
-import org.apache.flink.api.common.functions.FlatMapFunction;
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.graph.Edge;
-import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.Vertex;
-import org.apache.flink.graph.gsa.ApplyFunction;
-import org.apache.flink.graph.gsa.GatherFunction;
-import org.apache.flink.graph.gsa.SumFunction;
-import org.apache.flink.graph.gsa.Neighbor;
-import org.apache.flink.types.NullValue;
-import org.apache.flink.util.Collector;
-
-/**
- * This is an implementation of the Connected Components algorithm, using a 
gather-sum-apply iteration
- */
-public class GSAConnectedComponentsExample implements ProgramDescription {
-
-       // 
--------------------------------------------------------------------------------------------
-       //  Program
-       // 
--------------------------------------------------------------------------------------------
-
-       public static void main(String[] args) throws Exception {
-
-               if (!parseParameters(args)) {
-                       return;
-               }
-
-               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-
-               DataSet<Edge<Long, NullValue>> edges = getEdgeDataSet(env);
-
-               Graph<Long, Long, NullValue> graph = Graph.fromDataSet(edges, 
new InitVertices(), env);
-
-               // Execute the GSA iteration
-               Graph<Long, Long, NullValue> result =
-                               graph.runGatherSumApplyIteration(new 
GatherNeighborIds(), new SelectMinId(),
-                                               new UpdateComponentId(), 
maxIterations);
-
-               // Extract the vertices as the result
-               DataSet<Vertex<Long, Long>> connectedComponents = 
result.getVertices();
-
-               // emit result
-               if (fileOutput) {
-                       connectedComponents.writeAsCsv(outputPath, "\n", " ");
-               } else {
-                       connectedComponents.print();
-               }
-
-               env.execute("GSA Connected Components");
-       }
-
-       @SuppressWarnings("serial")
-       private static final class InitVertices implements MapFunction<Long, 
Long> {
-
-               public Long map(Long vertexId) {
-                       return vertexId;
-               }
-       }
-
-       // 
--------------------------------------------------------------------------------------------
-       //  Connected Components UDFs
-       // 
--------------------------------------------------------------------------------------------
-
-       @SuppressWarnings("serial")
-       private static final class GatherNeighborIds extends 
GatherFunction<Long, NullValue, Long> {
-
-               public Long gather(Neighbor<Long, NullValue> neighbor) {
-                       return neighbor.getNeighborValue();
-               }
-       };
-
-       @SuppressWarnings("serial")
-       private static final class SelectMinId extends SumFunction<Long, 
NullValue, Long> {
-
-               public Long sum(Long newValue, Long currentValue) {
-                       return Math.min(newValue, currentValue);
-               }
-       };
-
-       @SuppressWarnings("serial")
-       private static final class UpdateComponentId extends 
ApplyFunction<Long, Long, Long> {
-
-               public void apply(Long summedValue, Long origValue) {
-                       if (summedValue < origValue) {
-                               setResult(summedValue);
-                       }
-               }
-       };
-
-       // 
--------------------------------------------------------------------------------------------
-       //  Util methods
-       // 
--------------------------------------------------------------------------------------------
-
-       private static boolean fileOutput = false;
-       private static String edgeInputPath = null;
-       private static String outputPath = null;
-
-       private static int maxIterations = 16;
-
-       private static boolean parseParameters(String[] args) {
-
-               if (args.length > 0) {
-                       // parse input arguments
-                       fileOutput = true;
-
-                       if (args.length != 3) {
-                               System.err.println("Usage: 
GSAConnectedComponentsExample <edge path> " +
-                                               "<result path> <max 
iterations>");
-                               return false;
-                       }
-
-                       edgeInputPath = args[0];
-                       outputPath = args[1];
-                       maxIterations = Integer.parseInt(args[2]);
-               } else {
-                       System.out.println("Executing GSA Connected Components 
example with built-in default data.");
-                       System.out.println("  Provide parameters to read input 
data from files.");
-                       System.out.println("  See the documentation for the 
correct format of input files.");
-                       System.out.println("  Usage: 
GSAConnectedComponentsExample <edge path> <result path> <max iterations>");
-               }
-               return true;
-       }
-
-       @SuppressWarnings("serial")
-       private static DataSet<Edge<Long, NullValue>> 
getEdgeDataSet(ExecutionEnvironment env) {
-               if (fileOutput) {
-                       return env.readCsvFile(edgeInputPath)
-                                       .fieldDelimiter("\t")
-                                       .lineDelimiter("\n")
-                                       .types(Long.class, Long.class)
-                                       .map(new MapFunction<Tuple2<Long, 
Long>, Edge<Long, NullValue>>() {
-
-                                               public Edge<Long, NullValue> 
map(Tuple2<Long, Long> value) throws Exception {
-                                                       return new Edge<Long, 
NullValue>(value.f0, value.f1, NullValue.getInstance());
-                                               }
-                                       });
-               }
-
-               // Generates 3 components of size 2
-               return env.generateSequence(0, 2).flatMap(new 
FlatMapFunction<Long, Edge<Long, NullValue>>() {
-                       @Override
-                       public void flatMap(Long value, Collector<Edge<Long, 
NullValue>> out) throws Exception {
-                               out.collect(new Edge<Long, NullValue>(value, 
value + 3, NullValue.getInstance()));
-                       }
-               });
-       }
-
-       @Override
-       public String getDescription() {
-               return "GSA Connected Components";
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/8f4039dc/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSASingleSourceShortestPaths.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSASingleSourceShortestPaths.java
 
b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSASingleSourceShortestPaths.java
new file mode 100755
index 0000000..bbc344f
--- /dev/null
+++ 
b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSASingleSourceShortestPaths.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.example;
+
+import org.apache.flink.api.common.ProgramDescription;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.example.utils.SingleSourceShortestPathsData;
+import org.apache.flink.graph.gsa.ApplyFunction;
+import org.apache.flink.graph.gsa.GatherFunction;
+import org.apache.flink.graph.gsa.SumFunction;
+import org.apache.flink.graph.gsa.Neighbor;
+import org.apache.flink.graph.utils.Tuple3ToEdgeMap;
+
+/**
+ * This is an implementation of the Single Source Shortest Paths algorithm, 
using a gather-sum-apply iteration
+ */
+public class GSASingleSourceShortestPaths implements ProgramDescription {
+
+       // 
--------------------------------------------------------------------------------------------
+       //  Program
+       // 
--------------------------------------------------------------------------------------------
+
+       public static void main(String[] args) throws Exception {
+
+               if(!parseParameters(args)) {
+                       return;
+               }
+
+               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               DataSet<Edge<Long, Double>> edges = getEdgeDataSet(env);
+
+               Graph<Long, Double, Double> graph = Graph.fromDataSet(edges, 
new InitVertices(srcVertexId), env);
+
+               // Execute the GSA iteration
+               Graph<Long, Double, Double> result = graph
+                               .runGatherSumApplyIteration(new 
CalculateDistances(), new ChooseMinDistance(),
+                                               new UpdateDistance(), 
maxIterations);
+
+               // Extract the vertices as the result
+               DataSet<Vertex<Long, Double>> singleSourceShortestPaths = 
result.getVertices();
+
+               // emit result
+               if(fileOutput) {
+                       singleSourceShortestPaths.writeAsCsv(outputPath, "\n", 
" ");
+               } else {
+                       singleSourceShortestPaths.print();
+               }
+
+               env.execute("GSA Single Source Shortest Paths");
+       }
+
+       @SuppressWarnings("serial")
+       private static final class InitVertices implements MapFunction<Long, 
Double>{
+
+               private long srcId;
+
+               public InitVertices(long srcId) {
+                       this.srcId = srcId;
+               }
+
+               public Double map(Long id) {
+                       if (id.equals(srcId)) {
+                               return 0.0;
+                       }
+                       else {
+                               return Double.POSITIVE_INFINITY;
+                       }
+               }
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+       //  Single Source Shortest Path UDFs
+       // 
--------------------------------------------------------------------------------------------
+
+       @SuppressWarnings("serial")
+       private static final class CalculateDistances extends 
GatherFunction<Double, Double, Double> {
+
+               public Double gather(Neighbor<Double, Double> neighbor) {
+                       return neighbor.getNeighborValue() + 
neighbor.getEdgeValue();
+               }
+       };
+
+       @SuppressWarnings("serial")
+       private static final class ChooseMinDistance extends 
SumFunction<Double, Double, Double> {
+
+               public Double sum(Double newValue, Double currentValue) {
+                       return Math.min(newValue, currentValue);
+               }
+       };
+
+       @SuppressWarnings("serial")
+       private static final class UpdateDistance extends ApplyFunction<Long, 
Double, Double> {
+
+               public void apply(Double newDistance, Double oldDistance) {
+                       if (newDistance < oldDistance) {
+                               setResult(newDistance);
+                       }
+               }
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+       //  Util methods
+       // 
--------------------------------------------------------------------------------------------
+
+       private static boolean fileOutput = false;
+
+       private static Long srcVertexId = 1l;
+
+       private static String edgesInputPath = null;
+
+       private static String outputPath = null;
+
+       private static int maxIterations = 5;
+
+       private static boolean parseParameters(String[] args) {
+
+               if (args.length > 0) {
+                       if(args.length != 4) {
+                               System.err.println("Usage: 
GSASingleSourceShortestPaths <source vertex id>" +
+                                               " <input edges path> <output 
path> <num iterations>");
+                               return false;
+                       }
+
+                       fileOutput = true;
+                       srcVertexId = Long.parseLong(args[0]);
+                       edgesInputPath = args[1];
+                       outputPath = args[2];
+                       maxIterations = Integer.parseInt(args[3]);
+               } else {
+                               System.out.println("Executing GSASingle Source 
Shortest Paths example "
+                                               + "with default parameters and 
built-in default data.");
+                               System.out.println("  Provide parameters to 
read input data from files.");
+                               System.out.println("  See the documentation for 
the correct format of input files.");
+                               System.out.println("Usage: 
GSASingleSourceShortestPaths <source vertex id>" +
+                                               " <input edges path> <output 
path> <num iterations>");
+               }
+               return true;
+       }
+
+       private static DataSet<Edge<Long, Double>> 
getEdgeDataSet(ExecutionEnvironment env) {
+               if (fileOutput) {
+                       return env.readCsvFile(edgesInputPath)
+                                       .fieldDelimiter("\t")
+                                       .lineDelimiter("\n")
+                                       .types(Long.class, Long.class, 
Double.class)
+                                       .map(new Tuple3ToEdgeMap<Long, 
Double>());
+               } else {
+                       return 
SingleSourceShortestPathsData.getDefaultEdgeDataSet(env);
+               }
+       }
+
+       @Override
+       public String getDescription() {
+               return "GSA Single Source Shortest Paths";
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8f4039dc/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSASingleSourceShortestPathsExample.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSASingleSourceShortestPathsExample.java
 
b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSASingleSourceShortestPathsExample.java
deleted file mode 100755
index 75cbd78..0000000
--- 
a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSASingleSourceShortestPathsExample.java
+++ /dev/null
@@ -1,178 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.example;
-
-import org.apache.flink.api.common.ProgramDescription;
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.graph.Edge;
-import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.Vertex;
-import org.apache.flink.graph.example.utils.SingleSourceShortestPathsData;
-import org.apache.flink.graph.gsa.ApplyFunction;
-import org.apache.flink.graph.gsa.GatherFunction;
-import org.apache.flink.graph.gsa.SumFunction;
-import org.apache.flink.graph.gsa.Neighbor;
-import org.apache.flink.graph.utils.Tuple3ToEdgeMap;
-
-/**
- * This is an implementation of the Single Source Shortest Paths algorithm, 
using a gather-sum-apply iteration
- */
-public class GSASingleSourceShortestPathsExample implements ProgramDescription 
{
-
-       // 
--------------------------------------------------------------------------------------------
-       //  Program
-       // 
--------------------------------------------------------------------------------------------
-
-       public static void main(String[] args) throws Exception {
-
-               if(!parseParameters(args)) {
-                       return;
-               }
-
-               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-
-               DataSet<Edge<Long, Double>> edges = getEdgeDataSet(env);
-
-               Graph<Long, Double, Double> graph = Graph.fromDataSet(edges, 
new InitVertices(srcVertexId), env);
-
-               // Execute the GSA iteration
-               Graph<Long, Double, Double> result = graph
-                               .runGatherSumApplyIteration(new 
CalculateDistances(), new ChooseMinDistance(),
-                                               new UpdateDistance(), 
maxIterations);
-
-               // Extract the vertices as the result
-               DataSet<Vertex<Long, Double>> singleSourceShortestPaths = 
result.getVertices();
-
-               // emit result
-               if(fileOutput) {
-                       singleSourceShortestPaths.writeAsCsv(outputPath, "\n", 
" ");
-               } else {
-                       singleSourceShortestPaths.print();
-               }
-
-               env.execute("GSA Single Source Shortest Paths Example");
-       }
-
-       @SuppressWarnings("serial")
-       private static final class InitVertices implements MapFunction<Long, 
Double>{
-
-               private long srcId;
-
-               public InitVertices(long srcId) {
-                       this.srcId = srcId;
-               }
-
-               public Double map(Long id) {
-                       if (id.equals(srcId)) {
-                               return 0.0;
-                       }
-                       else {
-                               return Double.POSITIVE_INFINITY;
-                       }
-               }
-       }
-
-       // 
--------------------------------------------------------------------------------------------
-       //  Single Source Shortest Path UDFs
-       // 
--------------------------------------------------------------------------------------------
-
-       @SuppressWarnings("serial")
-       private static final class CalculateDistances extends 
GatherFunction<Double, Double, Double> {
-
-               public Double gather(Neighbor<Double, Double> neighbor) {
-                       return neighbor.getNeighborValue() + 
neighbor.getEdgeValue();
-               }
-       };
-
-       @SuppressWarnings("serial")
-       private static final class ChooseMinDistance extends 
SumFunction<Double, Double, Double> {
-
-               public Double sum(Double newValue, Double currentValue) {
-                       return Math.min(newValue, currentValue);
-               }
-       };
-
-       @SuppressWarnings("serial")
-       private static final class UpdateDistance extends ApplyFunction<Long, 
Double, Double> {
-
-               public void apply(Double newDistance, Double oldDistance) {
-                       if (newDistance < oldDistance) {
-                               setResult(newDistance);
-                       }
-               }
-       };
-
-       // 
--------------------------------------------------------------------------------------------
-       //  Util methods
-       // 
--------------------------------------------------------------------------------------------
-
-       private static boolean fileOutput = false;
-
-       private static Long srcVertexId = 1l;
-
-       private static String edgesInputPath = null;
-
-       private static String outputPath = null;
-
-       private static int maxIterations = 5;
-
-       private static boolean parseParameters(String[] args) {
-
-               if (args.length > 0) {
-                       if(args.length != 4) {
-                               System.err.println("Usage: 
GSASingleSourceShortestPaths <source vertex id>" +
-                                               " <input edges path> <output 
path> <num iterations>");
-                               return false;
-                       }
-
-                       fileOutput = true;
-                       srcVertexId = Long.parseLong(args[0]);
-                       edgesInputPath = args[1];
-                       outputPath = args[2];
-                       maxIterations = Integer.parseInt(args[3]);
-               } else {
-                               System.out.println("Executing GSASingle Source 
Shortest Paths example "
-                                               + "with default parameters and 
built-in default data.");
-                               System.out.println("  Provide parameters to 
read input data from files.");
-                               System.out.println("  See the documentation for 
the correct format of input files.");
-                               System.out.println("Usage: 
GSASingleSourceShortestPaths <source vertex id>" +
-                                               " <input edges path> <output 
path> <num iterations>");
-               }
-               return true;
-       }
-
-       private static DataSet<Edge<Long, Double>> 
getEdgeDataSet(ExecutionEnvironment env) {
-               if (fileOutput) {
-                       return env.readCsvFile(edgesInputPath)
-                                       .fieldDelimiter("\t")
-                                       .lineDelimiter("\n")
-                                       .types(Long.class, Long.class, 
Double.class)
-                                       .map(new Tuple3ToEdgeMap<Long, 
Double>());
-               } else {
-                       return 
SingleSourceShortestPathsData.getDefaultEdgeDataSet(env);
-               }
-       }
-
-       @Override
-       public String getDescription() {
-               return "GSA Single Source Shortest Paths";
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/8f4039dc/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/JaccardSimilarityMeasure.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/JaccardSimilarityMeasure.java
 
b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/JaccardSimilarityMeasure.java
new file mode 100644
index 0000000..dddaf41
--- /dev/null
+++ 
b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/JaccardSimilarityMeasure.java
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.example;
+
+import org.apache.flink.api.common.ProgramDescription;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.EdgeDirection;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.EdgesFunction;
+import org.apache.flink.graph.Triplet;
+import org.apache.flink.graph.example.utils.JaccardSimilarityMeasureData;
+import org.apache.flink.types.NullValue;
+import org.apache.flink.util.Collector;
+
+import java.util.HashSet;
+
+/**
+ * Given a directed, unweighted graph, return a weighted graph where the edge 
values are equal
+ * to the Jaccard similarity coefficient - the number of common neighbors 
divided by the the size
+ * of the union of neighbor sets - for the src and target vertices.
+ *
+ * <p>
+ * Input files are plain text files and must be formatted as follows:
+ * <br>
+ *     Edges are represented by pairs of srcVertexId, trgVertexId separated by 
tabs.
+ *     Edges themselves are separated by newlines.
+ *     For example: <code>1    2\n1    3\n</code> defines two edges 1-2 and 
1-3.
+ * </p>
+ *
+ * Usage <code> JaccardSimilarityMeasure &lt;edge path&gt; &lt;result 
path&gt;</code><br>
+ * If no parameters are provided, the program is run with default data from
+ * {@link org.apache.flink.graph.example.utils.JaccardSimilarityMeasureData}
+ */
+@SuppressWarnings("serial")
+public class JaccardSimilarityMeasure implements ProgramDescription {
+
+       public static void main(String [] args) throws Exception {
+
+               if(!parseParameters(args)) {
+                       return;
+               }
+
+               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               DataSet<Edge<Long, Double>> edges = getEdgesDataSet(env);
+
+               Graph<Long, NullValue, Double> graph = Graph.fromDataSet(edges, 
env);
+
+               DataSet<Vertex<Long, HashSet<Long>>> verticesWithNeighbors =
+                               graph.groupReduceOnEdges(new GatherNeighbors(), 
EdgeDirection.ALL);
+
+               Graph<Long, HashSet<Long>, Double> graphWithVertexValues = 
Graph.fromDataSet(verticesWithNeighbors, edges, env);
+
+               // the edge value will be the Jaccard similarity 
coefficient(number of common neighbors/ all neighbors)
+               DataSet<Tuple3<Long, Long, Double>> edgesWithJaccardWeight = 
graphWithVertexValues.getTriplets()
+                               .map(new WeighEdgesMapper());
+
+               DataSet<Edge<Long, Double>> result = 
graphWithVertexValues.joinWithEdges(edgesWithJaccardWeight,
+                               new MapFunction<Tuple2<Double, Double>, 
Double>() {
+
+                                       @Override
+                                       public Double map(Tuple2<Double, 
Double> value) throws Exception {
+                                               return value.f1;
+                                       }
+                               }).getEdges();
+
+               // emit result
+               if (fileOutput) {
+                       result.writeAsCsv(outputPath, "\n", ",");
+               } else {
+                       result.print();
+               }
+
+               env.execute("Executing Jaccard Similarity Measure");
+       }
+
+       @Override
+       public String getDescription() {
+               return "Vertex Jaccard Similarity Measure";
+       }
+
+       /**
+        * Each vertex will have a HashSet containing its neighbor ids as value.
+        */
+       private static final class GatherNeighbors implements 
EdgesFunction<Long, Double, Vertex<Long, HashSet<Long>>> {
+
+               @Override
+               public void iterateEdges(Iterable<Tuple2<Long, Edge<Long, 
Double>>> edges,
+                                                                               
                                Collector<Vertex<Long, HashSet<Long>>> out) 
throws Exception {
+
+                       HashSet<Long> neighborsHashSet = new HashSet<Long>();
+                       long vertexId = -1;
+
+                       for(Tuple2<Long, Edge<Long, Double>> edge : edges) {
+                               neighborsHashSet.add(getNeighborID(edge));
+                               vertexId = edge.f0;
+                       }
+                       out.collect(new Vertex<Long, HashSet<Long>>(vertexId, 
neighborsHashSet));
+               }
+       }
+
+       /**
+        * The edge weight will be the Jaccard coefficient, which is computed 
as follows:
+        *
+        * Consider the edge x-y
+        * We denote by sizeX and sizeY, the neighbors hash set size of x and y 
respectively.
+        * sizeX+sizeY = union + intersection of neighborhoods
+        * size(hashSetX.addAll(hashSetY)).distinct = union of neighborhoods
+        * The intersection can then be deduced.
+        *
+        * The Jaccard similarity coefficient is then, the intersection/union.
+        */
+       private static class WeighEdgesMapper implements 
MapFunction<Triplet<Long, HashSet<Long>, Double>,
+                       Tuple3<Long, Long, Double>> {
+
+               @Override
+               public Tuple3<Long, Long, Double> map(Triplet<Long, 
HashSet<Long>, Double> triplet)
+                               throws Exception {
+
+                       Vertex<Long, HashSet<Long>> source = 
triplet.getSrcVertex();
+                       Vertex<Long, HashSet<Long>> target = 
triplet.getTrgVertex();
+
+                       long unionPlusIntersection = source.getValue().size() + 
target.getValue().size();
+                       // within a HashSet, all elements are distinct
+                       source.getValue().addAll(target.getValue());
+                       // the source value contains the union
+                       long union = source.getValue().size();
+                       long intersection = unionPlusIntersection - union;
+
+                       return new Tuple3<Long, Long, Double>(source.getId(), 
target.getId(), (double) intersection/union);
+               }
+       }
+
+       /**
+        * Helper method that extracts the neighborId given an edge.
+        * @param edge
+        * @return
+        */
+       private static Long getNeighborID(Tuple2<Long, Edge<Long, Double>> 
edge) {
+               if(edge.f1.getSource() == edge.f0) {
+                       return edge.f1.getTarget();
+               } else {
+                       return edge.f1.getSource();
+               }
+       }
+
+       // 
*************************************************************************
+       // UTIL METHODS
+       // 
*************************************************************************
+
+       private static boolean fileOutput = false;
+       private static String edgeInputPath = null;
+       private static String outputPath = null;
+
+       private static boolean parseParameters(String [] args) {
+               if(args.length > 0) {
+                       if(args.length != 2) {
+                               System.err.println("Usage 
JaccardSimilarityMeasure <edge path> <output path>");
+                               return false;
+                       }
+
+                       fileOutput = true;
+                       edgeInputPath = args[0];
+                       outputPath = args[1];
+               } else {
+                       System.out.println("Executing JaccardSimilarityMeasure 
example with default parameters and built-in default data.");
+                       System.out.println("Provide parameters to read input 
data from files.");
+                       System.out.println("Usage JaccardSimilarityMeasure 
<edge path> <output path>");
+               }
+
+               return true;
+       }
+
+       private static DataSet<Edge<Long, Double>> 
getEdgesDataSet(ExecutionEnvironment env) {
+
+               if(fileOutput) {
+                       return env.readCsvFile(edgeInputPath)
+                                       .ignoreComments("#")
+                                       .fieldDelimiter("\t")
+                                       .lineDelimiter("\n")
+                                       .types(Long.class, Long.class)
+                                       .map(new MapFunction<Tuple2<Long, 
Long>, Edge<Long, Double>>() {
+                                               @Override
+                                               public Edge<Long, Double> 
map(Tuple2<Long, Long> tuple2) throws Exception {
+                                                       return new Edge<Long, 
Double>(tuple2.f0, tuple2.f1, new Double(0));
+                                               }
+                                       });
+               } else {
+                       return 
JaccardSimilarityMeasureData.getDefaultEdgeDataSet(env);
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8f4039dc/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/JaccardSimilarityMeasureExample.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/JaccardSimilarityMeasureExample.java
 
b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/JaccardSimilarityMeasureExample.java
deleted file mode 100644
index 2783a29..0000000
--- 
a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/JaccardSimilarityMeasureExample.java
+++ /dev/null
@@ -1,214 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.example;
-
-import org.apache.flink.api.common.ProgramDescription;
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.graph.Edge;
-import org.apache.flink.graph.EdgeDirection;
-import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.Vertex;
-import org.apache.flink.graph.EdgesFunction;
-import org.apache.flink.graph.Triplet;
-import org.apache.flink.graph.example.utils.JaccardSimilarityMeasureData;
-import org.apache.flink.types.NullValue;
-import org.apache.flink.util.Collector;
-
-import java.util.HashSet;
-
-/**
- * Given a directed, unweighted graph, return a weighted graph where the edge 
values are equal
- * to the Jaccard similarity coefficient - the number of common neighbors 
divided by the the size
- * of the union of neighbor sets - for the src and target vertices.
- *
- * <p>
- * Input files are plain text files and must be formatted as follows:
- * <br>
- *     Edges are represented by pairs of srcVertexId, trgVertexId separated by 
tabs.
- *     Edges themselves are separated by newlines.
- *     For example: <code>1    2\n1    3\n</code> defines two edges 1-2 and 
1-3.
- * </p>
- *
- * Usage <code> JaccardSimilarityMeasureExample &lt;edge path&gt; &lt;result 
path&gt;</code><br>
- * If no parameters are provided, the program is run with default data from
- * {@link org.apache.flink.graph.example.utils.JaccardSimilarityMeasureData}
- */
-@SuppressWarnings("serial")
-public class JaccardSimilarityMeasureExample implements ProgramDescription {
-
-       public static void main(String [] args) throws Exception {
-
-               if(!parseParameters(args)) {
-                       return;
-               }
-
-               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-
-               DataSet<Edge<Long, Double>> edges = getEdgesDataSet(env);
-
-               Graph<Long, NullValue, Double> graph = Graph.fromDataSet(edges, 
env);
-
-               DataSet<Vertex<Long, HashSet<Long>>> verticesWithNeighbors =
-                               graph.groupReduceOnEdges(new GatherNeighbors(), 
EdgeDirection.ALL);
-
-               Graph<Long, HashSet<Long>, Double> graphWithVertexValues = 
Graph.fromDataSet(verticesWithNeighbors, edges, env);
-
-               // the edge value will be the Jaccard similarity 
coefficient(number of common neighbors/ all neighbors)
-               DataSet<Tuple3<Long, Long, Double>> edgesWithJaccardWeight = 
graphWithVertexValues.getTriplets()
-                               .map(new WeighEdgesMapper());
-
-               DataSet<Edge<Long, Double>> result = 
graphWithVertexValues.joinWithEdges(edgesWithJaccardWeight,
-                               new MapFunction<Tuple2<Double, Double>, 
Double>() {
-
-                                       @Override
-                                       public Double map(Tuple2<Double, 
Double> value) throws Exception {
-                                               return value.f1;
-                                       }
-                               }).getEdges();
-
-               // emit result
-               if (fileOutput) {
-                       result.writeAsCsv(outputPath, "\n", ",");
-               } else {
-                       result.print();
-               }
-
-               env.execute("Executing Jaccard Similarity Measure");
-       }
-
-       @Override
-       public String getDescription() {
-               return "Vertex Jaccard Similarity Measure";
-       }
-
-       /**
-        * Each vertex will have a HashSet containing its neighbor ids as value.
-        */
-       private static final class GatherNeighbors implements 
EdgesFunction<Long, Double, Vertex<Long, HashSet<Long>>> {
-
-               @Override
-               public void iterateEdges(Iterable<Tuple2<Long, Edge<Long, 
Double>>> edges,
-                                                                               
                                Collector<Vertex<Long, HashSet<Long>>> out) 
throws Exception {
-
-                       HashSet<Long> neighborsHashSet = new HashSet<Long>();
-                       long vertexId = -1;
-
-                       for(Tuple2<Long, Edge<Long, Double>> edge : edges) {
-                               neighborsHashSet.add(getNeighborID(edge));
-                               vertexId = edge.f0;
-                       }
-                       out.collect(new Vertex<Long, HashSet<Long>>(vertexId, 
neighborsHashSet));
-               }
-       }
-
-       /**
-        * The edge weight will be the Jaccard coefficient, which is computed 
as follows:
-        *
-        * Consider the edge x-y
-        * We denote by sizeX and sizeY, the neighbors hash set size of x and y 
respectively.
-        * sizeX+sizeY = union + intersection of neighborhoods
-        * size(hashSetX.addAll(hashSetY)).distinct = union of neighborhoods
-        * The intersection can then be deduced.
-        *
-        * The Jaccard similarity coefficient is then, the intersection/union.
-        */
-       private static class WeighEdgesMapper implements 
MapFunction<Triplet<Long, HashSet<Long>, Double>,
-                       Tuple3<Long, Long, Double>> {
-
-               @Override
-               public Tuple3<Long, Long, Double> map(Triplet<Long, 
HashSet<Long>, Double> triplet)
-                               throws Exception {
-
-                       Vertex<Long, HashSet<Long>> source = 
triplet.getSrcVertex();
-                       Vertex<Long, HashSet<Long>> target = 
triplet.getTrgVertex();
-
-                       long unionPlusIntersection = source.getValue().size() + 
target.getValue().size();
-                       // within a HashSet, all elements are distinct
-                       source.getValue().addAll(target.getValue());
-                       // the source value contains the union
-                       long union = source.getValue().size();
-                       long intersection = unionPlusIntersection - union;
-
-                       return new Tuple3<Long, Long, Double>(source.getId(), 
target.getId(), (double) intersection/union);
-               }
-       }
-
-       /**
-        * Helper method that extracts the neighborId given an edge.
-        * @param edge
-        * @return
-        */
-       private static Long getNeighborID(Tuple2<Long, Edge<Long, Double>> 
edge) {
-               if(edge.f1.getSource() == edge.f0) {
-                       return edge.f1.getTarget();
-               } else {
-                       return edge.f1.getSource();
-               }
-       }
-
-       // 
*************************************************************************
-       // UTIL METHODS
-       // 
*************************************************************************
-
-       private static boolean fileOutput = false;
-       private static String edgeInputPath = null;
-       private static String outputPath = null;
-
-       private static boolean parseParameters(String [] args) {
-               if(args.length > 0) {
-                       if(args.length != 2) {
-                               System.err.println("Usage 
JaccardSimilarityMeasureExample <edge path> <output path>");
-                               return false;
-                       }
-
-                       fileOutput = true;
-                       edgeInputPath = args[0];
-                       outputPath = args[1];
-               } else {
-                       System.out.println("Executing JaccardSimilarityMeasure 
example with default parameters and built-in default data.");
-                       System.out.println("Provide parameters to read input 
data from files.");
-                       System.out.println("Usage 
JaccardSimilarityMeasureExample <edge path> <output path>");
-               }
-
-               return true;
-       }
-
-       private static DataSet<Edge<Long, Double>> 
getEdgesDataSet(ExecutionEnvironment env) {
-
-               if(fileOutput) {
-                       return env.readCsvFile(edgeInputPath)
-                                       .ignoreComments("#")
-                                       .fieldDelimiter("\t")
-                                       .lineDelimiter("\n")
-                                       .types(Long.class, Long.class)
-                                       .map(new MapFunction<Tuple2<Long, 
Long>, Edge<Long, Double>>() {
-                                               @Override
-                                               public Edge<Long, Double> 
map(Tuple2<Long, Long> tuple2) throws Exception {
-                                                       return new Edge<Long, 
Double>(tuple2.f0, tuple2.f1, new Double(0));
-                                               }
-                                       });
-               } else {
-                       return 
JaccardSimilarityMeasureData.getDefaultEdgeDataSet(env);
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/8f4039dc/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/LabelPropagation.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/LabelPropagation.java
 
b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/LabelPropagation.java
new file mode 100644
index 0000000..4012a4e
--- /dev/null
+++ 
b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/LabelPropagation.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.example;
+
+import org.apache.flink.api.common.ProgramDescription;
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.library.LabelPropagationAlgorithm;
+import org.apache.flink.graph.utils.Tuple2ToVertexMap;
+import org.apache.flink.types.NullValue;
+import org.apache.flink.util.Collector;
+
+/**
+ * This example uses the label propagation algorithm to detect communities by
+ * propagating labels. Initially, each vertex is assigned its id as its label.
+ * The vertices iteratively propagate their labels to their neighbors and adopt
+ * the most frequent label among their neighbors. The algorithm converges when
+ * no vertex changes value or the maximum number of iterations have been
+ * reached.
+ *
+ * The edges input file is expected to contain one edge per line, with long IDs
+ * in the following format:"<sourceVertexID>\t<targetVertexID>".
+ *
+ * The vertices input file is expected to contain one vertex per line, with 
long IDs
+ * and long vertex values, in the following format:"<vertexID>\t<vertexValue>".
+ *
+ * If no arguments are provided, the example runs with a random graph of 100 
vertices.
+ */
+public class LabelPropagation implements ProgramDescription {
+
+       public static void main(String[] args) throws Exception {
+
+               if(!parseParameters(args)) {
+                       return;
+               }
+
+               // Set up the execution environment
+               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               // Set up the graph
+               DataSet<Vertex<Long, Long>> vertices = getVertexDataSet(env);
+               DataSet<Edge<Long, NullValue>> edges = getEdgeDataSet(env);
+
+               Graph<Long, Long, NullValue> graph = 
Graph.fromDataSet(vertices, edges, env);
+
+               // Set up the program
+               DataSet<Vertex<Long, Long>> verticesWithCommunity = graph.run(
+                               new 
LabelPropagationAlgorithm<Long>(maxIterations)).getVertices();
+
+               // Emit results
+               if(fileOutput) {
+                       verticesWithCommunity.writeAsCsv(outputPath, "\n", ",");
+               } else {
+                       verticesWithCommunity.print();
+               }
+
+               // Execute the program
+               env.execute("Label Propagation Example");
+       }
+
+       // 
*************************************************************************
+       //     UTIL METHODS
+       // 
*************************************************************************
+
+       private static boolean fileOutput = false;
+       private static String vertexInputPath = null;
+       private static String edgeInputPath = null;
+       private static String outputPath = null;
+       private static long numVertices = 100;
+       private static int maxIterations = 10;
+
+       private static boolean parseParameters(String[] args) {
+
+               if(args.length > 0) {
+                       if(args.length != 4) {
+                               System.err.println("Usage: LabelPropagation 
<vertex path> <edge path> <output path> <num iterations>");
+                               return false;
+                       }
+
+                       fileOutput = true;
+                       vertexInputPath = args[0];
+                       edgeInputPath = args[1];
+                       outputPath = args[2];
+                       maxIterations = Integer.parseInt(args[3]);
+               } else {
+                       System.out.println("Executing LabelPropagation example 
with default parameters and built-in default data.");
+                       System.out.println("  Provide parameters to read input 
data from files.");
+                       System.out.println("  See the documentation for the 
correct format of input files.");
+                       System.out.println("  Usage: LabelPropagation <vertex 
path> <edge path> <output path> <num iterations>");
+               }
+               return true;
+       }
+
+       @SuppressWarnings("serial")
+       private static DataSet<Vertex<Long, Long>> 
getVertexDataSet(ExecutionEnvironment env) {
+
+               if (fileOutput) {
+                       return env.readCsvFile(vertexInputPath)
+                                       .fieldDelimiter("\t")
+                                       .lineDelimiter("\n")
+                                       .types(Long.class, Long.class)
+                                       .map(new Tuple2ToVertexMap<Long, 
Long>());
+               }
+
+               return env.generateSequence(1, numVertices).map(
+                               new MapFunction<Long, Vertex<Long, Long>>() {
+                                       public Vertex<Long, Long> map(Long l) 
throws Exception {
+                                               return new Vertex<Long, 
Long>(l, l);
+                                       }
+                               });
+       }
+
+       @SuppressWarnings("serial")
+       private static DataSet<Edge<Long, NullValue>> 
getEdgeDataSet(ExecutionEnvironment env) {
+
+               if (fileOutput) {
+                       return env.readCsvFile(edgeInputPath)
+                                       .fieldDelimiter("\t")
+                                       .lineDelimiter("\n")
+                                       .types(Long.class, Long.class)
+                                       .map(new MapFunction<Tuple2<Long, 
Long>, Edge<Long, NullValue>>() {
+                                               @Override
+                                               public Edge<Long, NullValue> 
map(Tuple2<Long, Long> value) throws Exception {
+                                                       return new Edge<Long, 
NullValue>(value.f0, value.f1, NullValue.getInstance());
+                                               }
+                                       });
+               }
+
+               return env.generateSequence(1, numVertices).flatMap(
+                               new FlatMapFunction<Long, Edge<Long, 
NullValue>>() {
+                                       @Override
+                                       public void flatMap(Long key,
+                                                       Collector<Edge<Long, 
NullValue>> out) {
+                                               int numOutEdges = (int) 
(Math.random() * (numVertices / 2));
+                                               for (int i = 0; i < 
numOutEdges; i++) {
+                                                       long target = (long) 
(Math.random() * numVertices) + 1;
+                                                       out.collect(new 
Edge<Long, NullValue>(key, target,
+                                                                       
NullValue.getInstance()));
+                                               }
+                                       }
+                               });
+       }
+
+       @Override
+       public String getDescription() {
+               return "Label Propagation Example";
+       }
+}
\ No newline at end of file

Reply via email to