Repository: flink
Updated Branches:
  refs/heads/master bbd97354b -> 9f7110748


[FLINK-2663] [gelly] Updated Gelly library methods to use generic key types

This squashes the following commits:

[gelly] Added missing Javadocs to GSA classes

[FLINK-2663] [gelly] Updated Gelly library methods to also use generic 
vertex/edge values where possible

This closes #1152


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9f711074
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9f711074
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9f711074

Branch: refs/heads/master
Commit: 9f7110748434d2bdea96f16c70f4f0894261ae69
Parents: bbd9735
Author: vasia <va...@apache.org>
Authored: Sun Sep 20 21:20:11 2015 +0200
Committer: vasia <va...@apache.org>
Committed: Thu Oct 1 21:22:35 2015 +0200

----------------------------------------------------------------------
 .../graph/example/ConnectedComponents.java      |   2 +-
 .../flink/graph/example/MusicProfiles.java      |   3 +-
 .../apache/flink/graph/gsa/ApplyFunction.java   |  15 +++
 .../apache/flink/graph/gsa/GatherFunction.java  |  16 +++
 .../org/apache/flink/graph/gsa/SumFunction.java |  18 +++-
 .../flink/graph/library/CommunityDetection.java |  53 ++++++----
 .../graph/library/ConnectedComponents.java      |  32 +++---
 .../graph/library/GSAConnectedComponents.java   |  22 ++--
 .../apache/flink/graph/library/GSAPageRank.java |   8 +-
 .../library/GSASingleSourceShortestPaths.java   |   8 +-
 .../flink/graph/library/GSATriangleCount.java   | 102 +++++++++----------
 .../flink/graph/library/LabelPropagation.java   |  12 ++-
 .../apache/flink/graph/library/PageRank.java    |   8 +-
 .../library/SingleSourceShortestPaths.java      |   8 +-
 .../flink/graph/utils/NullValueEdgeMapper.java  |  32 ++++++
 .../apache/flink/graph/gsa/GSACompilerTest.java |   1 -
 .../flink/graph/gsa/GSATranslationTest.java     |   1 -
 .../flink/graph/test/GatherSumApplyITCase.java  |   8 +-
 .../test/library/CommunityDetectionITCase.java  |   4 +-
 ...ctedComponentsWithRandomisedEdgesITCase.java |   3 +-
 .../test/library/LabelPropagationITCase.java    |   8 +-
 .../graph/test/library/PageRankITCase.java      |   8 +-
 .../graph/test/library/TriangleCountITCase.java |   6 +-
 .../test/operations/GraphOperationsITCase.java  |   1 -
 24 files changed, 242 insertions(+), 137 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/9f711074/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/ConnectedComponents.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/ConnectedComponents.java
 
b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/ConnectedComponents.java
index bd08190..4189602 100644
--- 
a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/ConnectedComponents.java
+++ 
b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/ConnectedComponents.java
@@ -69,7 +69,7 @@ public class ConnectedComponents implements 
ProgramDescription {
                }, env);
 
                DataSet<Vertex<Long, Long>> verticesWithMinIds = graph
-                               .run(new 
GSAConnectedComponents(maxIterations)).getVertices();
+                               .run(new GSAConnectedComponents<Long, 
NullValue>(maxIterations));
 
                // emit result
                if (fileOutput) {

http://git-wip-us.apache.org/repos/asf/flink/blob/9f711074/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/MusicProfiles.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/MusicProfiles.java
 
b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/MusicProfiles.java
index a56224d..e347bc5 100644
--- 
a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/MusicProfiles.java
+++ 
b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/MusicProfiles.java
@@ -153,8 +153,7 @@ public class MusicProfiles implements ProgramDescription {
                                                        public Long 
map(Tuple2<Long, Long> value) {
                                                                return value.f1;
                                                        }
-                                               }).run(new 
LabelPropagation<String>(maxIterations))
-                               .getVertices();
+                                               }).run(new 
LabelPropagation<String, NullValue>(maxIterations));
 
                if (fileOutput) {
                        verticesWithCommunity.writeAsCsv(communitiesOutputPath, 
"\n", "\t");

http://git-wip-us.apache.org/repos/asf/flink/blob/9f711074/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/ApplyFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/ApplyFunction.java
 
b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/ApplyFunction.java
index ed0cf70..5a8e97a 100755
--- 
a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/ApplyFunction.java
+++ 
b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/ApplyFunction.java
@@ -27,6 +27,13 @@ import org.apache.flink.util.Collector;
 import java.io.Serializable;
 import java.util.Collection;
 
+/**
+ * The base class for the third and last step of a {@link 
GatherSumApplyIteration}.
+ *
+ * @param <K> the vertex ID type
+ * @param <VV> the vertex value type
+ * @param <M> the input type (produced by the Sum phase)
+ */
 @SuppressWarnings("serial")
 public abstract class ApplyFunction<K, VV, M> implements Serializable {
 
@@ -51,6 +58,14 @@ public abstract class ApplyFunction<K, VV, M> implements 
Serializable {
 
        
//---------------------------------------------------------------------------------------------
 
+       /**
+        * This method is invoked once per superstep, after the {@link 
SumFunction} 
+        * in a {@link GatherSumApplyIteration}.
+        * It updates the Vertex values.
+        * 
+        * @param newValue the value computed during the current superstep.
+        * @param currentValue the current Vertex value.
+        */
        public abstract void apply(M newValue, VV currentValue);
 
        /**

http://git-wip-us.apache.org/repos/asf/flink/blob/9f711074/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherFunction.java
 
b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherFunction.java
index 5a09a5a..563b20e 100755
--- 
a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherFunction.java
+++ 
b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherFunction.java
@@ -25,6 +25,13 @@ import org.apache.flink.types.Value;
 import java.io.Serializable;
 import java.util.Collection;
 
+/**
+ * The base class for the first step of a {@link GatherSumApplyIteration}.
+ * 
+ * @param <VV> the vertex value type
+ * @param <EV> the edge value type
+ * @param <M> the output type 
+ */
 @SuppressWarnings("serial")
 public abstract class GatherFunction<VV, EV, M> implements Serializable {
 
@@ -49,6 +56,15 @@ public abstract class GatherFunction<VV, EV, M> implements 
Serializable {
 
        
//---------------------------------------------------------------------------------------------
 
+       /**
+        * This method is invoked once per superstep, for each {@link Neighbor} 
of each Vertex 
+        * in the beginning of each superstep in a {@link 
GatherSumApplyIteration}.
+        * It needs to produce a partial value, which will be combined with 
other partial value
+        * in the next phase of the iteration.
+        *  
+        * @param neighbor the input Neighbor. It provides access to the source 
Vertex and the Edge objects.
+        * @return a partial result to be combined in the Sum phase.
+        */
        public abstract M gather(Neighbor<VV, EV> neighbor);
 
        /**

http://git-wip-us.apache.org/repos/asf/flink/blob/9f711074/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/SumFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/SumFunction.java
 
b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/SumFunction.java
index 69baae4..f27e275 100755
--- 
a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/SumFunction.java
+++ 
b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/SumFunction.java
@@ -25,6 +25,13 @@ import org.apache.flink.types.Value;
 import java.io.Serializable;
 import java.util.Collection;
 
+/**
+ * The base class for the second step of a {@link GatherSumApplyIteration}.
+ *
+ * @param <VV> the vertex value type
+ * @param <EV> the edge value type
+ * @param <M> the output type
+ */
 @SuppressWarnings("serial")
 public abstract class SumFunction<VV, EV, M> implements Serializable {
 
@@ -48,7 +55,16 @@ public abstract class SumFunction<VV, EV, M> implements 
Serializable {
        }
 
        
//---------------------------------------------------------------------------------------------
-
+       /**
+        * This method is invoked once per superstep, after the {@link 
GatherFunction} 
+        * in a {@link GatherSumApplyIteration}.
+        * It combines the partial values produced by {@link 
GatherFunction#gather(Neighbor)}
+        * in pairs, until a single value has been computed.
+        * 
+        * @param arg0 the first partial value.
+        * @param arg1 the second partial value.
+        * @return the combined value.
+        */
        public abstract M sum(M arg0, M arg1);
 
        /**

http://git-wip-us.apache.org/repos/asf/flink/blob/9f711074/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/CommunityDetection.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/CommunityDetection.java
 
b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/CommunityDetection.java
index 31488ee..0dd39fc 100644
--- 
a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/CommunityDetection.java
+++ 
b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/CommunityDetection.java
@@ -19,6 +19,8 @@
 package org.apache.flink.graph.library;
 
 import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.graph.Edge;
 import org.apache.flink.graph.Graph;
@@ -34,18 +36,21 @@ import java.util.TreeMap;
 /**
  * Community Detection Algorithm.
  *
- * Initially, each vertex is assigned a tuple formed of its own id along with 
a score equal to 1.0, as value.
+ * This implementation expects Long Vertex values and labels. The Vertex 
values of the input Graph provide the initial label assignments.
+ * 
+ * Initially, each vertex is assigned a tuple formed of its own initial value 
along with a score equal to 1.0.
  * The vertices propagate their labels and max scores in iterations, each time 
adopting the label with the
  * highest score from the list of received messages. The chosen label is 
afterwards re-scored using the fraction
  * delta/the superstep number. Delta is passed as a parameter and has 0.5 as a 
default value.
  *
  * The algorithm converges when vertices no longer update their value or when 
the maximum number of iterations
  * is reached.
+ * 
+ * @param <K> the Vertex ID type 
  *
  * @see <a href="http://arxiv.org/pdf/0808.2633.pdf";>article explaining the 
algorithm in detail</a>
  */
-public class CommunityDetection implements
-       GraphAlgorithm<Long, Long, Double, Graph<Long, Long, Double>> {
+public class CommunityDetection<K> implements GraphAlgorithm<K, Long, Double, 
Graph<K, Long, Double>> {
 
        private Integer maxIterations;
 
@@ -58,20 +63,22 @@ public class CommunityDetection implements
        }
 
        @Override
-       public Graph<Long, Long, Double> run(Graph<Long, Long, Double> graph) {
+       public Graph<K, Long, Double> run(Graph<K, Long, Double> graph) {
 
-               Graph<Long, Long, Double> undirectedGraph = 
graph.getUndirected();
+               DataSet<Vertex<K, Tuple2<Long, Double>>> initializedVertices = 
graph.getVertices()
+                               .map(new AddScoreToVertexValuesMapper<K>());
 
-               Graph<Long, Tuple2<Long, Double>, Double> 
graphWithScoredVertices = undirectedGraph
-                               .mapVertices(new 
AddScoreToVertexValuesMapper());
+               Graph<K, Tuple2<Long, Double>, Double> graphWithScoredVertices =
+                               Graph.fromDataSet(initializedVertices, 
graph.getEdges(), graph.getContext()).getUndirected();
 
-               return graphWithScoredVertices.runVertexCentricIteration(new 
VertexLabelUpdater(delta),
-                               new LabelMessenger(), maxIterations)
-                               .mapVertices(new 
RemoveScoreFromVertexValuesMapper());
+               return graphWithScoredVertices.runVertexCentricIteration(new 
VertexLabelUpdater<K>(delta),
+                               new LabelMessenger<K>(), maxIterations)
+                               .mapVertices(new 
RemoveScoreFromVertexValuesMapper<K>());
        }
 
        @SuppressWarnings("serial")
-       public static final class VertexLabelUpdater extends 
VertexUpdateFunction<Long, Tuple2<Long, Double>, Tuple2<Long, Double>> {
+       public static final class VertexLabelUpdater<K> extends 
VertexUpdateFunction<
+               K, Tuple2<Long, Double>, Tuple2<Long, Double>> {
 
                private Double delta;
 
@@ -80,7 +87,7 @@ public class CommunityDetection implements
                }
 
                @Override
-               public void updateVertex(Vertex<Long, Tuple2<Long, Double>> 
vertex,
+               public void updateVertex(Vertex<K, Tuple2<Long, Double>> vertex,
                                                                
MessageIterator<Tuple2<Long, Double>> inMessages) throws Exception {
 
                        // we would like these two maps to be ordered
@@ -140,34 +147,36 @@ public class CommunityDetection implements
        }
 
        @SuppressWarnings("serial")
-       public static final class LabelMessenger extends 
MessagingFunction<Long, Tuple2<Long, Double>,
+       public static final class LabelMessenger<K> extends 
MessagingFunction<K, Tuple2<Long, Double>,
                        Tuple2<Long, Double>, Double> {
 
                @Override
-               public void sendMessages(Vertex<Long, Tuple2<Long, Double>> 
vertex) throws Exception {
+               public void sendMessages(Vertex<K, Tuple2<Long, Double>> 
vertex) throws Exception {
 
-                       for(Edge<Long, Double> edge : getEdges()) {
+                       for(Edge<K, Double> edge : getEdges()) {
                                sendMessageTo(edge.getTarget(), new 
Tuple2<Long, Double>(vertex.getValue().f0,
                                                vertex.getValue().f1 * 
edge.getValue()));
                        }
-
                }
        }
 
        @SuppressWarnings("serial")
-       public static final class AddScoreToVertexValuesMapper implements 
MapFunction<Vertex<Long, Long>, Tuple2<Long, Double>> {
+       @ForwardedFields("f0")
+       public static final class AddScoreToVertexValuesMapper<K> implements 
MapFunction<
+               Vertex<K, Long>, Vertex<K, Tuple2<Long, Double>>> {
 
-               @Override
-               public Tuple2<Long, Double> map(Vertex<Long, Long> vertex) 
throws Exception {
-                       return new Tuple2<Long, Double>(vertex.getValue(), 1.0);
+               public Vertex<K, Tuple2<Long, Double>> map(Vertex<K, Long> 
vertex) {
+                       return new Vertex<K, Tuple2<Long, Double>>(
+                                       vertex.getId(), new Tuple2<Long, 
Double>(vertex.getValue(), 1.0));
                }
        }
 
        @SuppressWarnings("serial")
-       public static final class RemoveScoreFromVertexValuesMapper implements 
MapFunction<Vertex<Long, Tuple2<Long, Double>>, Long> {
+       public static final class RemoveScoreFromVertexValuesMapper<K> 
implements MapFunction<
+               Vertex<K, Tuple2<Long, Double>>, Long> {
 
                @Override
-               public Long map(Vertex<Long, Tuple2<Long, Double>> vertex) 
throws Exception {
+               public Long map(Vertex<K, Tuple2<Long, Double>> vertex) throws 
Exception {
                        return vertex.getValue().f0;
                }
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/9f711074/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/ConnectedComponents.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/ConnectedComponents.java
 
b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/ConnectedComponents.java
index 871f315..ed853fe 100644
--- 
a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/ConnectedComponents.java
+++ 
b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/ConnectedComponents.java
@@ -18,27 +18,32 @@
 
 package org.apache.flink.graph.library;
 
+import org.apache.flink.api.java.DataSet;
 import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.GraphAlgorithm;
 import org.apache.flink.graph.Vertex;
 import org.apache.flink.graph.spargel.MessageIterator;
 import org.apache.flink.graph.spargel.MessagingFunction;
 import org.apache.flink.graph.spargel.VertexUpdateFunction;
+import org.apache.flink.graph.utils.NullValueEdgeMapper;
 import org.apache.flink.types.NullValue;
 
 /**
- * A vertex-centric implementation of the Connected components algorithm.
+ * A vertex-centric implementation of the Connected Components algorithm.
  *
- * Initially, each vertex will have its own ID as a value(is its own 
component). The vertices propagate their
- * current component ID in iterations, each time adopting a new value from the 
received neighbor IDs,
+ * This implementation assumes that the vertices of the input Graph are 
initialized with unique, Long component IDs.
+ * The vertices propagate their current component ID in iterations, each time 
adopting a new value from the received neighbor IDs,
  * provided that the value is less than the current minimum.
  *
  * The algorithm converges when vertices no longer update their value or when 
the maximum number of iterations
  * is reached.
+ * 
+ * The result is a DataSet of vertices, where the vertex value corresponds to 
the assigned component ID.
+ * 
+ * @see {@link org.apache.flink.graph.library.GSAConnectedComponents}
  */
 @SuppressWarnings("serial")
-public class ConnectedComponents implements
-       GraphAlgorithm<Long, Long, NullValue, Graph<Long, Long, NullValue>> {
+public class ConnectedComponents<K, EV> implements GraphAlgorithm<K, Long, EV, 
DataSet<Vertex<K, Long>>> {
 
        private Integer maxIterations;
 
@@ -47,21 +52,24 @@ public class ConnectedComponents implements
        }
 
        @Override
-       public Graph<Long, Long, NullValue> run(Graph<Long, Long, NullValue> 
graph) throws Exception {
+       public DataSet<Vertex<K, Long>> run(Graph<K, Long, EV> graph) throws 
Exception {
 
-               Graph<Long, Long, NullValue> undirectedGraph = 
graph.getUndirected();
+               Graph<K, Long, NullValue> undirectedGraph = graph.mapEdges(new 
NullValueEdgeMapper<K, EV>())
+                               .getUndirected();
 
                // initialize vertex values and run the Vertex Centric Iteration
-               return undirectedGraph.runVertexCentricIteration(new 
CCUpdater(), new CCMessenger(), maxIterations);
+               return undirectedGraph.runVertexCentricIteration(
+                               new CCUpdater<K>(), new CCMessenger<K>(), 
maxIterations)
+                               .getVertices();
        }
 
        /**
         * Updates the value of a vertex by picking the minimum neighbor ID out 
of all the incoming messages.
         */
-       public static final class CCUpdater extends VertexUpdateFunction<Long, 
Long, Long> {
+       public static final class CCUpdater<K> extends VertexUpdateFunction<K, 
Long, Long> {
 
                @Override
-               public void updateVertex(Vertex<Long, Long> vertex, 
MessageIterator<Long> messages) throws Exception {
+               public void updateVertex(Vertex<K, Long> vertex, 
MessageIterator<Long> messages) throws Exception {
                        long min = Long.MAX_VALUE;
 
                        for (long msg : messages) {
@@ -78,10 +86,10 @@ public class ConnectedComponents implements
        /**
         * Distributes the minimum ID associated with a given vertex among all 
the target vertices.
         */
-       public static final class CCMessenger extends MessagingFunction<Long, 
Long, Long, NullValue> {
+       public static final class CCMessenger<K> extends MessagingFunction<K, 
Long, Long, NullValue> {
 
                @Override
-               public void sendMessages(Vertex<Long, Long> vertex) throws 
Exception {
+               public void sendMessages(Vertex<K, Long> vertex) throws 
Exception {
                        // send current minimum to neighbors
                        sendMessageToAllNeighbors(vertex.getValue());
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/9f711074/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/GSAConnectedComponents.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/GSAConnectedComponents.java
 
b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/GSAConnectedComponents.java
index f852f3e..77bc2cf 100755
--- 
a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/GSAConnectedComponents.java
+++ 
b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/GSAConnectedComponents.java
@@ -18,19 +18,25 @@
 
 package org.apache.flink.graph.library;
 
+import org.apache.flink.api.java.DataSet;
 import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.GraphAlgorithm;
+import org.apache.flink.graph.Vertex;
 import org.apache.flink.graph.gsa.ApplyFunction;
 import org.apache.flink.graph.gsa.GatherFunction;
 import org.apache.flink.graph.gsa.SumFunction;
 import org.apache.flink.graph.gsa.Neighbor;
+import org.apache.flink.graph.utils.NullValueEdgeMapper;
 import org.apache.flink.types.NullValue;
 
 /**
  * This is an implementation of the Connected Components algorithm, using a 
gather-sum-apply iteration.
+ * This implementation assumes that the vertices of the input Graph are 
initialized with unique, Long component IDs.
+ * The result is a DataSet of vertices, where the vertex value corresponds to 
the assigned component ID.
+ * 
+ * @see {@link org.apache.flink.graph.library.ConnectedComponents}
  */
-public class GSAConnectedComponents implements
-       GraphAlgorithm<Long, Long, NullValue, Graph<Long, Long, NullValue>> {
+public class GSAConnectedComponents<K, EV> implements GraphAlgorithm<K, Long, 
EV, DataSet<Vertex<K, Long>>> {
 
        private Integer maxIterations;
 
@@ -39,13 +45,15 @@ public class GSAConnectedComponents implements
        }
 
        @Override
-       public Graph<Long, Long, NullValue> run(Graph<Long, Long, NullValue> 
graph) throws Exception {
+       public DataSet<Vertex<K, Long>> run(Graph<K, Long, EV> graph) throws 
Exception {
 
-               Graph<Long, Long, NullValue> undirectedGraph = 
graph.getUndirected();
+               Graph<K, Long, NullValue> undirectedGraph = graph.mapEdges(new 
NullValueEdgeMapper<K, EV>())
+                               .getUndirected();
 
                // initialize vertex values and run the Vertex Centric Iteration
-               return undirectedGraph.runGatherSumApplyIteration(new 
GatherNeighborIds(), new SelectMinId(), new UpdateComponentId(),
-                               maxIterations);
+               return undirectedGraph.runGatherSumApplyIteration(
+                               new GatherNeighborIds(), new SelectMinId(), new 
UpdateComponentId<K>(),
+                               maxIterations).getVertices();
        }
 
        // 
--------------------------------------------------------------------------------------------
@@ -69,7 +77,7 @@ public class GSAConnectedComponents implements
        };
 
        @SuppressWarnings("serial")
-       private static final class UpdateComponentId extends 
ApplyFunction<Long, Long, Long> {
+       private static final class UpdateComponentId<K> extends 
ApplyFunction<K, Long, Long> {
 
                public void apply(Long summedValue, Long origValue) {
                        if (summedValue < origValue) {

http://git-wip-us.apache.org/repos/asf/flink/blob/9f711074/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/GSAPageRank.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/GSAPageRank.java
 
b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/GSAPageRank.java
index 6ce2ed6..df3e89a 100644
--- 
a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/GSAPageRank.java
+++ 
b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/GSAPageRank.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.GraphAlgorithm;
+import org.apache.flink.graph.Vertex;
 import org.apache.flink.graph.gsa.ApplyFunction;
 import org.apache.flink.graph.gsa.GatherFunction;
 import org.apache.flink.graph.gsa.Neighbor;
@@ -36,7 +37,7 @@ import org.apache.flink.graph.gsa.SumFunction;
  * 
  * The implementation assumes that each page has at least one incoming and one 
outgoing link.
  */
-public class GSAPageRank<K> implements GraphAlgorithm<K, Double, Double, 
Graph<K, Double, Double>> {
+public class GSAPageRank<K> implements GraphAlgorithm<K, Double, Double, 
DataSet<Vertex<K, Double>>> {
 
        private double beta;
        private int maxIterations;
@@ -58,7 +59,7 @@ public class GSAPageRank<K> implements GraphAlgorithm<K, 
Double, Double, Graph<K
        }
 
        @Override
-       public Graph<K, Double, Double> run(Graph<K, Double, Double> network) 
throws Exception {
+       public DataSet<Vertex<K, Double>> run(Graph<K, Double, Double> network) 
throws Exception {
 
                if (numberOfVertices == 0) {
                        numberOfVertices = network.numberOfVertices();
@@ -70,7 +71,8 @@ public class GSAPageRank<K> implements GraphAlgorithm<K, 
Double, Double, Graph<K
                                .joinWithEdgesOnSource(vertexOutDegrees, new 
InitWeightsMapper());
 
                return networkWithWeights.runGatherSumApplyIteration(new 
GatherRanks(numberOfVertices), new SumRanks(),
-                               new UpdateRanks<K>(beta, numberOfVertices), 
maxIterations);
+                               new UpdateRanks<K>(beta, numberOfVertices), 
maxIterations)
+                               .getVertices();
        }
 
        // 
--------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/9f711074/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/GSASingleSourceShortestPaths.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/GSASingleSourceShortestPaths.java
 
b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/GSASingleSourceShortestPaths.java
index 18bdd1d..5a76072 100755
--- 
a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/GSASingleSourceShortestPaths.java
+++ 
b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/GSASingleSourceShortestPaths.java
@@ -19,6 +19,7 @@
 package org.apache.flink.graph.library;
 
 import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
 import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.GraphAlgorithm;
 import org.apache.flink.graph.Vertex;
@@ -31,7 +32,7 @@ import org.apache.flink.graph.gsa.Neighbor;
  * This is an implementation of the Single Source Shortest Paths algorithm, 
using a gather-sum-apply iteration
  */
 public class GSASingleSourceShortestPaths<K> implements
-       GraphAlgorithm<K, Double, Double, Graph<K, Double, Double>> {
+       GraphAlgorithm<K, Double, Double, DataSet<Vertex<K, Double>>> {
 
        private final K srcVertexId;
        private final Integer maxIterations;
@@ -42,11 +43,12 @@ public class GSASingleSourceShortestPaths<K> implements
        }
 
        @Override
-       public Graph<K, Double, Double> run(Graph<K, Double, Double> input) {
+       public DataSet<Vertex<K, Double>> run(Graph<K, Double, Double> input) {
 
                return input.mapVertices(new InitVerticesMapper<K>(srcVertexId))
                                .runGatherSumApplyIteration(new 
CalculateDistances(), new ChooseMinDistance(),
-                                               new UpdateDistance<K>(), 
maxIterations);
+                                               new UpdateDistance<K>(), 
maxIterations)
+                                               .getVertices();
        }
 
        @SuppressWarnings("serial")

http://git-wip-us.apache.org/repos/asf/flink/blob/9f711074/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/GSATriangleCount.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/GSATriangleCount.java
 
b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/GSATriangleCount.java
index 3d4d902..76d170d 100644
--- 
a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/GSATriangleCount.java
+++ 
b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/GSATriangleCount.java
@@ -23,7 +23,6 @@ import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.tuple.Tuple1;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.graph.GraphAlgorithm;
 import org.apache.flink.graph.ReduceNeighborsFunction;
@@ -46,62 +45,61 @@ import java.util.TreeMap;
  *
  * This implementation is non - iterative.
  *
- * The algorithm takes an undirected, unweighted graph as input and outputs a 
DataSet of
- * Tuple1 which contains a single integer representing the number of triangles.
+ * The algorithm takes an undirected, unweighted graph as input and outputs a 
DataSet
+ * which contains a single integer representing the number of triangles.
  */
-public class GSATriangleCount implements
-               GraphAlgorithm<Long, NullValue, NullValue, 
DataSet<Tuple1<Integer>>> {
+public class GSATriangleCount<K extends Comparable<K>, VV, EV> implements
+               GraphAlgorithm<K, VV, EV, DataSet<Integer>> {
 
        @SuppressWarnings("serial")
        @Override
-       public DataSet<Tuple1<Integer>> run(Graph<Long, NullValue, NullValue> 
input) throws Exception {
+       public DataSet<Integer> run(Graph<K, VV, EV> input) throws Exception {
 
                ExecutionEnvironment env = input.getContext();
 
                // order the edges so that src is always higher than trg
-               DataSet<Edge<Long, NullValue>> edges = input.getEdges()
-                               .map(new OrderEdges()).distinct();
+               DataSet<Edge<K, NullValue>> edges = input.getEdges().map(new 
OrderEdges<K, EV>()).distinct();
 
-               Graph<Long, TreeMap<Long, Integer>, NullValue> graph = 
Graph.fromDataSet(edges,
-                               new VertexInitializer(), env);
+               Graph<K, TreeMap<K, Integer>, NullValue> graph = 
Graph.fromDataSet(edges,
+                               new VertexInitializer<K>(), env);
 
                // select neighbors with ids higher than the current vertex id
                // Gather: a no-op in this case
                // Sum: create the set of neighbors
-               DataSet<Tuple2<Long, TreeMap<Long, Integer>>> higherIdNeighbors 
=
-                               graph.reduceOnNeighbors(new 
GatherHigherIdNeighbors(), EdgeDirection.IN);
+               DataSet<Tuple2<K, TreeMap<K, Integer>>> higherIdNeighbors =
+                               graph.reduceOnNeighbors(new 
GatherHigherIdNeighbors<K>(), EdgeDirection.IN);
 
-               Graph<Long, TreeMap<Long, Integer>, NullValue> 
graphWithReinitializedVertexValues =
-                               graph.mapVertices(new 
VertexInitializerEmptyTreeMap());
+               Graph<K, TreeMap<K, Integer>, NullValue> 
graphWithReinitializedVertexValues =
+                               graph.mapVertices(new 
VertexInitializerEmptyTreeMap<K>());
 
                // Apply: attach the computed values to the vertices
                // joinWithVertices to update the node values
-               DataSet<Vertex<Long, TreeMap<Long, Integer>>> 
verticesWithHigherIdNeighbors =
-                               
graphWithReinitializedVertexValues.joinWithVertices(higherIdNeighbors, new 
AttachValues()).getVertices();
+               DataSet<Vertex<K, TreeMap<K, Integer>>> 
verticesWithHigherIdNeighbors =
+                               
graphWithReinitializedVertexValues.joinWithVertices(higherIdNeighbors, new 
AttachValues<K>()).getVertices();
 
-               Graph<Long, TreeMap<Long,Integer>, NullValue> 
graphWithNeighbors = Graph.fromDataSet(verticesWithHigherIdNeighbors,
+               Graph<K, TreeMap<K,Integer>, NullValue> graphWithNeighbors = 
Graph.fromDataSet(verticesWithHigherIdNeighbors,
                                edges, env);
 
                // propagate each received value to neighbors with higher id
                // Gather: a no-op in this case
                // Sum: propagate values
-               DataSet<Tuple2<Long, TreeMap<Long, Integer>>> propagatedValues 
= graphWithNeighbors
-                               .reduceOnNeighbors(new 
GatherHigherIdNeighbors(), EdgeDirection.IN);
+               DataSet<Tuple2<K, TreeMap<K, Integer>>> propagatedValues = 
graphWithNeighbors
+                               .reduceOnNeighbors(new 
GatherHigherIdNeighbors<K>(), EdgeDirection.IN);
 
                // Apply: attach propagated values to vertices
-               DataSet<Vertex<Long, TreeMap<Long, Integer>>> 
verticesWithPropagatedValues =
-                               
graphWithReinitializedVertexValues.joinWithVertices(propagatedValues, new 
AttachValues()).getVertices();
+               DataSet<Vertex<K, TreeMap<K, Integer>>> 
verticesWithPropagatedValues =
+                               
graphWithReinitializedVertexValues.joinWithVertices(propagatedValues, new 
AttachValues<K>()).getVertices();
 
-               Graph<Long, TreeMap<Long, Integer>, NullValue> 
graphWithPropagatedNeighbors =
+               Graph<K, TreeMap<K, Integer>, NullValue> 
graphWithPropagatedNeighbors =
                                Graph.fromDataSet(verticesWithPropagatedValues, 
graphWithNeighbors.getEdges(), env);
 
                // Scatter: compute the number of triangles
-               DataSet<Tuple1<Integer>> numberOfTriangles = 
graphWithPropagatedNeighbors.getTriplets()
-                               .map(new ComputeTriangles()).reduce(new 
ReduceFunction<Tuple1<Integer>>() {
+               DataSet<Integer> numberOfTriangles = 
graphWithPropagatedNeighbors.getTriplets()
+                               .map(new ComputeTriangles<K>()).reduce(new 
ReduceFunction<Integer>() {
 
                                        @Override
-                                       public Tuple1<Integer> 
reduce(Tuple1<Integer> firstTuple, Tuple1<Integer> secondTuple) throws 
Exception {
-                                               return new 
Tuple1<Integer>(firstTuple.f0 + secondTuple.f0);
+                                       public Integer reduce(Integer first, 
Integer second) throws Exception {
+                                               return first + second;
                                        }
                                });
 
@@ -109,24 +107,25 @@ public class GSATriangleCount implements
        }
 
        @SuppressWarnings("serial")
-       private static final class OrderEdges implements MapFunction<Edge<Long, 
NullValue>, Edge<Long, NullValue>> {
+       private static final class OrderEdges<K extends Comparable<K>, EV> 
implements
+               MapFunction<Edge<K, EV>, Edge<K, NullValue>> {
 
                @Override
-               public Edge<Long, NullValue> map(Edge<Long, NullValue> edge) 
throws Exception {
-                       if (edge.getSource() < edge.getTarget()) {
-                               return new Edge<Long, 
NullValue>(edge.getTarget(), edge.getSource(), NullValue.getInstance());
+               public Edge<K, NullValue> map(Edge<K, EV> edge) throws 
Exception {
+                       if (edge.getSource().compareTo(edge.getTarget()) < 0) {
+                               return new Edge<K, NullValue>(edge.getTarget(), 
edge.getSource(), NullValue.getInstance());
                        } else {
-                               return edge;
+                               return new Edge<K, NullValue>(edge.getSource(), 
edge.getTarget(), NullValue.getInstance());
                        }
                }
        }
 
        @SuppressWarnings("serial")
-       private static final class VertexInitializer implements 
MapFunction<Long, TreeMap<Long, Integer>> {
+       private static final class VertexInitializer<K> implements 
MapFunction<K, TreeMap<K, Integer>> {
 
                @Override
-               public TreeMap<Long, Integer> map(Long value) throws Exception {
-                       TreeMap<Long, Integer> neighbors = new TreeMap<Long, 
Integer>();
+               public TreeMap<K, Integer> map(K value) throws Exception {
+                       TreeMap<K, Integer> neighbors = new TreeMap<K, 
Integer>();
                        neighbors.put(value, 1);
 
                        return neighbors;
@@ -134,31 +133,32 @@ public class GSATriangleCount implements
        }
 
        @SuppressWarnings("serial")
-       private static final class VertexInitializerEmptyTreeMap implements
-                       MapFunction<Vertex<Long, TreeMap<Long, Integer>>, 
TreeMap<Long, Integer>> {
+       private static final class VertexInitializerEmptyTreeMap<K> implements
+                       MapFunction<Vertex<K, TreeMap<K, Integer>>, TreeMap<K, 
Integer>> {
 
                @Override
-               public TreeMap<Long, Integer> map(Vertex<Long, TreeMap<Long, 
Integer>> vertex) throws Exception {
-                       return new TreeMap<Long, Integer>();
+               public TreeMap<K, Integer> map(Vertex<K, TreeMap<K, Integer>> 
vertex) throws Exception {
+                       return new TreeMap<K, Integer>();
                }
        }
 
        @SuppressWarnings("serial")
-       private static final class AttachValues implements 
MapFunction<Tuple2<TreeMap<Long, Integer>,
-                       TreeMap<Long, Integer>>, TreeMap<Long, Integer>> {
+       private static final class AttachValues<K> implements 
MapFunction<Tuple2<TreeMap<K, Integer>,
+                       TreeMap<K, Integer>>, TreeMap<K, Integer>> {
 
                @Override
-               public TreeMap<Long, Integer> map(Tuple2<TreeMap<Long, 
Integer>, TreeMap<Long, Integer>> tuple2) throws Exception {
+               public TreeMap<K, Integer> map(Tuple2<TreeMap<K, Integer>, 
TreeMap<K, Integer>> tuple2) throws Exception {
                        return tuple2.f1;
                }
        }
 
        @SuppressWarnings("serial")
-       private static final class GatherHigherIdNeighbors implements 
ReduceNeighborsFunction<TreeMap<Long,Integer>> {
+       private static final class GatherHigherIdNeighbors<K> implements
+               ReduceNeighborsFunction<TreeMap<K,Integer>> {
 
                @Override
-               public TreeMap<Long,Integer> 
reduceNeighbors(TreeMap<Long,Integer> first, TreeMap<Long,Integer> second) {
-                       for (Long key : second.keySet()) {
+               public TreeMap<K, Integer> reduceNeighbors(TreeMap<K,Integer> 
first, TreeMap<K,Integer> second) {
+                       for (K key : second.keySet()) {
                                Integer value = first.get(key);
                                if (value != null) {
                                        first.put(key, value + second.get(key));
@@ -171,20 +171,20 @@ public class GSATriangleCount implements
        }
 
        @SuppressWarnings("serial")
-       private static final class ComputeTriangles implements 
MapFunction<Triplet<Long, TreeMap<Long, Integer>, NullValue>,
-                       Tuple1<Integer>> {
+       private static final class ComputeTriangles<K> implements 
MapFunction<Triplet<K, TreeMap<K, Integer>, NullValue>,
+                       Integer> {
 
                @Override
-               public Tuple1<Integer> map(Triplet<Long, TreeMap<Long, 
Integer>, NullValue> triplet) throws Exception {
+               public Integer map(Triplet<K, TreeMap<K, Integer>, NullValue> 
triplet) throws Exception {
 
-                       Vertex<Long, TreeMap<Long, Integer>> srcVertex = 
triplet.getSrcVertex();
-                       Vertex<Long, TreeMap<Long, Integer>> trgVertex = 
triplet.getTrgVertex();
+                       Vertex<K, TreeMap<K, Integer>> srcVertex = 
triplet.getSrcVertex();
+                       Vertex<K, TreeMap<K, Integer>> trgVertex = 
triplet.getTrgVertex();
                        int triangles = 0;
 
                        if(trgVertex.getValue().get(srcVertex.getId()) != null) 
{
-                               
triangles=trgVertex.getValue().get(srcVertex.getId());
+                               triangles = 
trgVertex.getValue().get(srcVertex.getId());
                        }
-                       return new Tuple1<Integer>(triangles);
+                       return triangles;
                }
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/9f711074/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/LabelPropagation.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/LabelPropagation.java
 
b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/LabelPropagation.java
index 2dd2180..82dfee7 100644
--- 
a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/LabelPropagation.java
+++ 
b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/LabelPropagation.java
@@ -18,12 +18,14 @@
 
 package org.apache.flink.graph.library;
 
+import org.apache.flink.api.java.DataSet;
 import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.GraphAlgorithm;
 import org.apache.flink.graph.Vertex;
 import org.apache.flink.graph.spargel.MessageIterator;
 import org.apache.flink.graph.spargel.MessagingFunction;
 import org.apache.flink.graph.spargel.VertexUpdateFunction;
+import org.apache.flink.graph.utils.NullValueEdgeMapper;
 import org.apache.flink.types.NullValue;
 
 import java.util.HashMap;
@@ -42,8 +44,8 @@ import java.util.Map.Entry;
  */
 @SuppressWarnings("serial")
 
-public class LabelPropagation<K extends Comparable<K>> implements
-       GraphAlgorithm<K, Long, NullValue, Graph<K, Long, NullValue>> {
+public class LabelPropagation<K extends Comparable<K>, EV> implements 
GraphAlgorithm<K, Long, EV,
+       DataSet<Vertex<K, Long>>> {
 
        private final int maxIterations;
 
@@ -52,12 +54,12 @@ public class LabelPropagation<K extends Comparable<K>> 
implements
        }
 
        @Override
-       public Graph<K, Long, NullValue> run(Graph<K, Long, NullValue> input) {
+       public DataSet<Vertex<K, Long>> run(Graph<K, Long, EV> input) {
 
                // iteratively adopt the most frequent label among the neighbors
                // of each vertex
-               return input.runVertexCentricIteration(new 
UpdateVertexLabel<K>(), new SendNewLabelToNeighbors<K>(),
-                               maxIterations);
+               return input.mapEdges(new NullValueEdgeMapper<K, 
EV>()).runVertexCentricIteration(new UpdateVertexLabel<K>(), new 
SendNewLabelToNeighbors<K>(),
+                               maxIterations).getVertices();
        }
 
        /**

http://git-wip-us.apache.org/repos/asf/flink/blob/9f711074/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/PageRank.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/PageRank.java
 
b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/PageRank.java
index 47f7acd..8193dba 100644
--- 
a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/PageRank.java
+++ 
b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/PageRank.java
@@ -37,8 +37,7 @@ import org.apache.flink.graph.spargel.VertexUpdateFunction;
  * 
  * The implementation assumes that each page has at least one incoming and one 
outgoing link.
  */
-public class PageRank<K> implements
-       GraphAlgorithm<K, Double, Double, Graph<K, Double, Double>> {
+public class PageRank<K> implements GraphAlgorithm<K, Double, Double, 
DataSet<Vertex<K, Double>>> {
 
        private double beta;
        private int maxIterations;
@@ -66,7 +65,7 @@ public class PageRank<K> implements
        }
 
        @Override
-       public Graph<K, Double, Double> run(Graph<K, Double, Double> network) 
throws Exception {
+       public DataSet<Vertex<K, Double>> run(Graph<K, Double, Double> network) 
throws Exception {
 
                if (numberOfVertices == 0) {
                        numberOfVertices = network.numberOfVertices();
@@ -78,7 +77,8 @@ public class PageRank<K> implements
                                .joinWithEdgesOnSource(vertexOutDegrees, new 
InitWeightsMapper());
 
                return networkWithWeights.runVertexCentricIteration(new 
VertexRankUpdater<K>(beta, numberOfVertices),
-                               new RankMessenger<K>(numberOfVertices), 
maxIterations);
+                               new RankMessenger<K>(numberOfVertices), 
maxIterations)
+                               .getVertices();
        }
 
        /**

http://git-wip-us.apache.org/repos/asf/flink/blob/9f711074/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/SingleSourceShortestPaths.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/SingleSourceShortestPaths.java
 
b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/SingleSourceShortestPaths.java
index 1911f73..60c4c17 100644
--- 
a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/SingleSourceShortestPaths.java
+++ 
b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/SingleSourceShortestPaths.java
@@ -19,6 +19,7 @@
 package org.apache.flink.graph.library;
 
 import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
 import org.apache.flink.graph.Edge;
 import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.GraphAlgorithm;
@@ -31,8 +32,7 @@ import org.apache.flink.graph.spargel.VertexUpdateFunction;
  * This is an implementation of the Single-Source-Shortest Paths algorithm, 
using a vertex-centric iteration.
  */
 @SuppressWarnings("serial")
-public class SingleSourceShortestPaths<K> implements
-       GraphAlgorithm<K, Double, Double, Graph<K, Double, Double>> {
+public class SingleSourceShortestPaths<K> implements GraphAlgorithm<K, Double, 
Double, DataSet<Vertex<K, Double>>> {
 
        private final K srcVertexId;
        private final Integer maxIterations;
@@ -43,11 +43,11 @@ public class SingleSourceShortestPaths<K> implements
        }
 
        @Override
-       public Graph<K, Double, Double> run(Graph<K, Double, Double> input) {
+       public DataSet<Vertex<K, Double>> run(Graph<K, Double, Double> input) {
 
                return input.mapVertices(new InitVerticesMapper<K>(srcVertexId))
                                .runVertexCentricIteration(new 
VertexDistanceUpdater<K>(), new MinDistanceMessenger<K>(),
-                               maxIterations);
+                               maxIterations).getVertices();
        }
 
        public static final class InitVerticesMapper<K> implements 
MapFunction<Vertex<K, Double>, Double> {

http://git-wip-us.apache.org/repos/asf/flink/blob/9f711074/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/utils/NullValueEdgeMapper.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/utils/NullValueEdgeMapper.java
 
b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/utils/NullValueEdgeMapper.java
new file mode 100644
index 0000000..2bd4719
--- /dev/null
+++ 
b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/utils/NullValueEdgeMapper.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.utils;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.types.NullValue;
+
+public class NullValueEdgeMapper<K, EV> implements     MapFunction<Edge<K, 
EV>, NullValue> {
+
+       private static final long serialVersionUID = 1L;
+
+       public NullValue map(Edge<K, EV> edge) {
+               return NullValue.getInstance();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9f711074/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/gsa/GSACompilerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/gsa/GSACompilerTest.java
 
b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/gsa/GSACompilerTest.java
index 7a66639..2ad203f 100644
--- 
a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/gsa/GSACompilerTest.java
+++ 
b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/gsa/GSACompilerTest.java
@@ -56,7 +56,6 @@ public class GSACompilerTest extends CompilerTestBase {
                        env.setParallelism(DEFAULT_PARALLELISM);
                        // compose test program
                        {
-                               @SuppressWarnings("unchecked")
                                DataSet<Edge<Long, NullValue>> edges = 
env.fromElements(new Tuple3<Long, Long, NullValue>(
                                                1L, 2L, 
NullValue.getInstance())).map(new Tuple3ToEdgeMap<Long, NullValue>());
 

http://git-wip-us.apache.org/repos/asf/flink/blob/9f711074/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/gsa/GSATranslationTest.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/gsa/GSATranslationTest.java
 
b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/gsa/GSATranslationTest.java
index 0a7b1c7..ced7508 100644
--- 
a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/gsa/GSATranslationTest.java
+++ 
b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/gsa/GSATranslationTest.java
@@ -71,7 +71,6 @@ public class GSATranslationTest {
                        // ------------ construct the test program 
------------------
                        {
 
-                               @SuppressWarnings("unchecked")
                                DataSet<Edge<Long, NullValue>> edges = 
env.fromElements(new Tuple3<Long, Long, NullValue>(
                                                1L, 2L, 
NullValue.getInstance())).map(new Tuple3ToEdgeMap<Long, NullValue>());
 

http://git-wip-us.apache.org/repos/asf/flink/blob/9f711074/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/GatherSumApplyITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/GatherSumApplyITCase.java
 
b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/GatherSumApplyITCase.java
index 4b381b6..0213f02 100755
--- 
a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/GatherSumApplyITCase.java
+++ 
b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/GatherSumApplyITCase.java
@@ -55,8 +55,8 @@ public class GatherSumApplyITCase extends 
MultipleProgramsTestBase {
                                
ConnectedComponentsDefaultData.getDefaultEdgeDataSet(env),
                                new InitMapperCC(), env);
 
-        List<Vertex<Long, Long>> result = inputGraph.run(new 
GSAConnectedComponents(16))
-                       .getVertices().collect();
+        List<Vertex<Long, Long>> result = inputGraph.run(
+                       new GSAConnectedComponents<Long, 
NullValue>(16)).collect();
 
                expectedResult = "1,1\n" +
                                "2,1\n" +
@@ -78,8 +78,8 @@ public class GatherSumApplyITCase extends 
MultipleProgramsTestBase {
                                
SingleSourceShortestPathsData.getDefaultEdgeDataSet(env),
                                new InitMapperSSSP(), env);
 
-        List<Vertex<Long, Double>> result = inputGraph.run(new 
GSASingleSourceShortestPaths<Long>(1l, 16))
-                       .getVertices().collect();
+        List<Vertex<Long, Double>> result = inputGraph.run(
+                       new GSASingleSourceShortestPaths<Long>(1l, 
16)).collect();
 
                expectedResult = "1,0.0\n" +
                                "2,12.0\n" +

http://git-wip-us.apache.org/repos/asf/flink/blob/9f711074/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/library/CommunityDetectionITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/library/CommunityDetectionITCase.java
 
b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/library/CommunityDetectionITCase.java
index 104996e..421eaa9 100644
--- 
a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/library/CommunityDetectionITCase.java
+++ 
b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/library/CommunityDetectionITCase.java
@@ -50,7 +50,7 @@ public class CommunityDetectionITCase extends 
MultipleProgramsTestBase {
                Graph<Long, Long, Double> inputGraph = Graph.fromDataSet(
                                
CommunityDetectionData.getSimpleEdgeDataSet(env), new InitLabels(), env);
 
-        List<Vertex<Long, Long>> result = inputGraph.run(new 
CommunityDetection(1, CommunityDetectionData.DELTA))
+        List<Vertex<Long, Long>> result = inputGraph.run(new 
CommunityDetection<Long>(1, CommunityDetectionData.DELTA))
                        .getVertices().collect();
 
                expected = CommunityDetectionData.COMMUNITIES_SINGLE_ITERATION;
@@ -66,7 +66,7 @@ public class CommunityDetectionITCase extends 
MultipleProgramsTestBase {
                Graph<Long, Long, Double> inputGraph = Graph.fromDataSet(
                                CommunityDetectionData.getTieEdgeDataSet(env), 
new InitLabels(), env);
 
-        List<Vertex<Long, Long>> result = inputGraph.run(new 
CommunityDetection(1, CommunityDetectionData.DELTA))
+        List<Vertex<Long, Long>> result = inputGraph.run(new 
CommunityDetection<Long>(1, CommunityDetectionData.DELTA))
                        .getVertices().collect();
                expected = CommunityDetectionData.COMMUNITIES_WITH_TIE;
                compareResultAsTuples(result, expected);

http://git-wip-us.apache.org/repos/asf/flink/blob/9f711074/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/library/ConnectedComponentsWithRandomisedEdgesITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/library/ConnectedComponentsWithRandomisedEdgesITCase.java
 
b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/library/ConnectedComponentsWithRandomisedEdgesITCase.java
index ef4b467..9eb7387 100644
--- 
a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/library/ConnectedComponentsWithRandomisedEdgesITCase.java
+++ 
b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/library/ConnectedComponentsWithRandomisedEdgesITCase.java
@@ -60,8 +60,7 @@ public class ConnectedComponentsWithRandomisedEdgesITCase 
extends JavaProgramTes
 
                Graph<Long, Long, NullValue> graph = 
Graph.fromDataSet(initialVertices, edges, env);
 
-               DataSet<Vertex<Long, Long>> result = graph
-                               .run(new 
ConnectedComponents(100)).getVertices();
+               DataSet<Vertex<Long, Long>> result = graph.run(new 
ConnectedComponents<Long, NullValue>(100));
 
                result.writeAsCsv(resultPath, "\n", " ");
                env.execute();

http://git-wip-us.apache.org/repos/asf/flink/blob/9f711074/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/library/LabelPropagationITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/library/LabelPropagationITCase.java
 
b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/library/LabelPropagationITCase.java
index da36ef6..8785b0d 100644
--- 
a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/library/LabelPropagationITCase.java
+++ 
b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/library/LabelPropagationITCase.java
@@ -51,8 +51,8 @@ public class LabelPropagationITCase extends 
MultipleProgramsTestBase {
                                LabelPropagationData.getDefaultVertexSet(env),
                                
LabelPropagationData.getDefaultEdgeDataSet(env), env);
 
-        List<Vertex<Long, Long>> result = inputGraph.run(new 
LabelPropagation<Long>(1))
-                       .getVertices().collect();
+        List<Vertex<Long, Long>> result = inputGraph.run(new 
LabelPropagation<Long, NullValue>(1))
+                       .collect();
 
                expectedResult = LabelPropagationData.LABELS_AFTER_1_ITERATION;
                compareResultAsTuples(result, expectedResult);
@@ -69,8 +69,8 @@ public class LabelPropagationITCase extends 
MultipleProgramsTestBase {
                                LabelPropagationData.getTieVertexSet(env),
                                LabelPropagationData.getTieEdgeDataSet(env), 
env);
 
-        List<Vertex<Long, Long>> result = inputGraph.run(new 
LabelPropagation<Long>(1))
-                       .getVertices().collect();
+        List<Vertex<Long, Long>> result = inputGraph.run(new 
LabelPropagation<Long, NullValue>(1))
+                       .collect();
 
                expectedResult = LabelPropagationData.LABELS_WITH_TIE;
                compareResultAsTuples(result, expectedResult);

http://git-wip-us.apache.org/repos/asf/flink/blob/9f711074/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/library/PageRankITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/library/PageRankITCase.java
 
b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/library/PageRankITCase.java
index cc0327f..94c7713 100644
--- 
a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/library/PageRankITCase.java
+++ 
b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/library/PageRankITCase.java
@@ -51,7 +51,7 @@ public class PageRankITCase extends MultipleProgramsTestBase {
                                PageRankData.getDefaultEdgeDataSet(env), new 
InitMapper(), env);
 
         List<Vertex<Long, Double>> result = inputGraph.run(new 
PageRank<Long>(0.85, 3))
-                       .getVertices().collect();
+                       .collect();
         
         compareWithDelta(result, expectedResult, 0.01);
        }
@@ -64,7 +64,7 @@ public class PageRankITCase extends MultipleProgramsTestBase {
                                PageRankData.getDefaultEdgeDataSet(env), new 
InitMapper(), env);
 
         List<Vertex<Long, Double>> result = inputGraph.run(new 
GSAPageRank<Long>(0.85, 3))
-                       .getVertices().collect();
+                       .collect();
         
         compareWithDelta(result, expectedResult, 0.01);
        }
@@ -77,7 +77,7 @@ public class PageRankITCase extends MultipleProgramsTestBase {
                                PageRankData.getDefaultEdgeDataSet(env), new 
InitMapper(), env);
 
         List<Vertex<Long, Double>> result = inputGraph.run(new 
PageRank<Long>(0.85, 5, 3))
-                       .getVertices().collect();
+                       .collect();
         
         compareWithDelta(result, expectedResult, 0.01);
        }
@@ -90,7 +90,7 @@ public class PageRankITCase extends MultipleProgramsTestBase {
                                PageRankData.getDefaultEdgeDataSet(env), new 
InitMapper(), env);
 
         List<Vertex<Long, Double>> result = inputGraph.run(new 
GSAPageRank<Long>(0.85, 5, 3))
-                       .getVertices().collect();
+                       .collect();
         
         compareWithDelta(result, expectedResult, 0.01);
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/9f711074/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/library/TriangleCountITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/library/TriangleCountITCase.java
 
b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/library/TriangleCountITCase.java
index 047bbf7..1d9ab9f 100644
--- 
a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/library/TriangleCountITCase.java
+++ 
b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/library/TriangleCountITCase.java
@@ -19,12 +19,12 @@
 package org.apache.flink.graph.test.library;
 
 import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.tuple.Tuple1;
 import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.example.utils.TriangleCountData;
 import org.apache.flink.graph.library.GSATriangleCount;
 import org.apache.flink.test.util.MultipleProgramsTestBase;
 import org.apache.flink.types.NullValue;
+import org.junit.Assert;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
@@ -48,9 +48,9 @@ public class TriangleCountITCase extends 
MultipleProgramsTestBase {
                Graph<Long, NullValue, NullValue> graph = 
Graph.fromDataSet(TriangleCountData.getDefaultEdgeDataSet(env),
                                env).getUndirected();
 
-               List<Tuple1<Integer>> numberOfTriangles = graph.run(new 
GSATriangleCount()).collect();
+               List<Integer> numberOfTriangles = graph.run(new 
GSATriangleCount<Long, NullValue, NullValue>()).collect();
                expectedResult = TriangleCountData.RESULTED_NUMBER_OF_TRIANGLES;
 
-               compareResultAsTuples(numberOfTriangles, expectedResult);
+               Assert.assertEquals(numberOfTriangles.get(0).intValue(), 
Integer.parseInt(expectedResult));
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/9f711074/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphOperationsITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphOperationsITCase.java
 
b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphOperationsITCase.java
index a06f881..ffc9da9 100644
--- 
a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphOperationsITCase.java
+++ 
b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphOperationsITCase.java
@@ -326,7 +326,6 @@ public class GraphOperationsITCase extends 
MultipleProgramsTestBase {
                compareResultAsTuples(result, expectedResult);
        }
 
-       @SuppressWarnings("unchecked")
        @Test
        public void testDifference2() throws Exception {
                /*

Reply via email to