[FLINK-3618] [gelly] Rename abstract UDF classes in Scatter-Gather 
implementation

Rename MessageFunction to ScatterFunction
and VertexUpdateFunction to GatherFunction.

Change the parameter order in
  Graph.runScatterGatherIteration(VertexUpdateFunction, MessagingFunction)
to
  Graph.runScatterGatherIteration(ScatterFunction, GatherFunction)

This closes #2184


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/918e5d0c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/918e5d0c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/918e5d0c

Branch: refs/heads/master
Commit: 918e5d0c9dae7a080dcf2505df38af4c655ba6b6
Parents: 6c6b17b
Author: Greg Hogan <c...@greghogan.com>
Authored: Fri May 6 15:39:38 2016 -0400
Committer: Greg Hogan <c...@greghogan.com>
Committed: Thu Jun 30 13:04:32 2016 -0400

----------------------------------------------------------------------
 docs/apis/batch/libs/gelly.md                   |  78 ++--
 .../flink/graph/examples/HITSAlgorithm.java     | 112 +++--
 .../flink/graph/examples/IncrementalSSSP.java   |  40 +-
 .../examples/SingleSourceShortestPaths.java     |  42 +-
 .../examples/SingleSourceShortestPaths.scala    |  47 ++-
 .../test/examples/IncrementalSSSPITCase.java    |   2 +-
 .../org/apache/flink/graph/scala/Graph.scala    | 318 +++++++--------
 .../main/java/org/apache/flink/graph/Graph.java |  30 +-
 .../flink/graph/IterationConfiguration.java     |   5 +-
 .../flink/graph/library/CommunityDetection.java |  46 +--
 .../graph/library/ConnectedComponents.java      |  54 +--
 .../graph/library/GSAConnectedComponents.java   |   2 +-
 .../library/GSASingleSourceShortestPaths.java   |   2 +-
 .../flink/graph/library/LabelPropagation.java   |  54 +--
 .../apache/flink/graph/library/PageRank.java    |  53 ++-
 .../library/SingleSourceShortestPaths.java      |  44 +-
 .../flink/graph/library/TriangleEnumerator.java |   6 +-
 .../flink/graph/spargel/GatherFunction.java     | 251 ++++++++++++
 .../flink/graph/spargel/MessagingFunction.java  | 338 ---------------
 .../flink/graph/spargel/ScatterFunction.java    | 338 +++++++++++++++
 .../spargel/ScatterGatherConfiguration.java     |  44 +-
 .../graph/spargel/ScatterGatherIteration.java   | 407 +++++++++----------
 .../graph/spargel/VertexUpdateFunction.java     | 251 ------------
 .../graph/spargel/SpargelCompilerTest.java      |  12 +-
 .../graph/spargel/SpargelTranslationTest.java   | 119 +++---
 .../test/CollectionModeSuperstepITCase.java     |  35 +-
 .../test/ScatterGatherConfigurationITCase.java  | 227 +++++------
 27 files changed, 1460 insertions(+), 1497 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/918e5d0c/docs/apis/batch/libs/gelly.md
----------------------------------------------------------------------
diff --git a/docs/apis/batch/libs/gelly.md b/docs/apis/batch/libs/gelly.md
index aa232fc..f063f09 100644
--- a/docs/apis/batch/libs/gelly.md
+++ b/docs/apis/batch/libs/gelly.md
@@ -1083,10 +1083,10 @@ final class Compute extends ComputeFunction {
 ### Scatter-Gather Iterations
 The scatter-gather model, also known as "signal/collect" model, expresses 
computation from the perspective of a vertex in the graph. The computation 
proceeds in synchronized iteration steps, called supersteps. In each superstep, 
a vertex produces messages for other vertices and updates its value based on 
the messages it receives. To use scatter-gather iterations in Gelly, the user 
only needs to define how a vertex behaves in each superstep:
 
-* <strong>Messaging</strong>:  corresponds to the scatter phase and produces 
the messages that a vertex will send to other vertices.
-* <strong>Value Update</strong>: corresponds to the gather phase and updates 
the vertex value using the received messages.
+* <strong>Scatter</strong>:  produces the messages that a vertex will send to 
other vertices.
+* <strong>Gather</strong>: updates the vertex value using received messages.
 
-Gelly provides methods for scatter-gather iterations. The user only needs to 
implement two functions, corresponding to the scatter and gather phases. The 
first function is a `MessagingFunction`, which allows a vertex to send out 
messages for other vertices. Messages are recieved during the same superstep as 
they are sent. The second function is `VertexUpdateFunction`, which defines how 
a vertex will update its value based on the received messages.
+Gelly provides methods for scatter-gather iterations. The user only needs to 
implement two functions, corresponding to the scatter and gather phases. The 
first function is a `ScatterFunction`, which allows a vertex to send out 
messages to other vertices. Messages are received during the same superstep as 
they are sent. The second function is `GatherFunction`, which defines how a 
vertex will update its value based on the received messages.
 These functions and the maximum number of iterations to run are given as 
parameters to Gelly's `runScatterGatherIteration`. This method will execute the 
scatter-gather iteration on the input Graph and return a new Graph, with 
updated vertex values.
 
 A scatter-gather iteration can be extended with information such as the total 
number of vertices, the in degree and out degree.
@@ -1109,7 +1109,7 @@ int maxIterations = 10;
 
 // Execute the scatter-gather iteration
 Graph<Long, Double, Double> result = graph.runScatterGatherIteration(
-                       new VertexDistanceUpdater(), new 
MinDistanceMessenger(), maxIterations);
+                       new MinDistanceMessenger(), new 
VertexDistanceUpdater(), maxIterations);
 
 // Extract the vertices as the result
 DataSet<Vertex<Long, Double>> singleSourceShortestPaths = result.getVertices();
@@ -1118,7 +1118,7 @@ DataSet<Vertex<Long, Double>> singleSourceShortestPaths = 
result.getVertices();
 // - - -  UDFs - - - //
 
 // scatter: messaging
-public static final class MinDistanceMessenger extends MessagingFunction<Long, 
Double, Double, Double> {
+public static final class MinDistanceMessenger extends ScatterFunction<Long, 
Double, Double, Double> {
 
        public void sendMessages(Vertex<Long, Double> vertex) {
                for (Edge<Long, Double> edge : getEdges()) {
@@ -1128,7 +1128,7 @@ public static final class MinDistanceMessenger extends 
MessagingFunction<Long, D
 }
 
 // gather: vertex update
-public static final class VertexDistanceUpdater extends 
VertexUpdateFunction<Long, Double, Double> {
+public static final class VertexDistanceUpdater extends GatherFunction<Long, 
Double, Double> {
 
        public void updateVertex(Vertex<Long, Double> vertex, 
MessageIterator<Double> inMessages) {
                Double minDistance = Double.MAX_VALUE;
@@ -1157,7 +1157,7 @@ val graph: Graph[Long, Double, Double] = ...
 val maxIterations = 10
 
 // Execute the scatter-gather iteration
-val result = graph.runScatterGatherIteration(new VertexDistanceUpdater, new 
MinDistanceMessenger, maxIterations)
+val result = graph.runScatterGatherIteration(new MinDistanceMessenger, new 
VertexDistanceUpdater, maxIterations)
 
 // Extract the vertices as the result
 val singleSourceShortestPaths = result.getVertices
@@ -1166,7 +1166,7 @@ val singleSourceShortestPaths = result.getVertices
 // - - -  UDFs - - - //
 
 // messaging
-final class MinDistanceMessenger extends MessagingFunction[Long, Double, 
Double, Double] {
+final class MinDistanceMessenger extends ScatterFunction[Long, Double, Double, 
Double] {
 
        override def sendMessages(vertex: Vertex[Long, Double]) = {
                for (edge: Edge[Long, Double] <- getEdges) {
@@ -1176,7 +1176,7 @@ final class MinDistanceMessenger extends 
MessagingFunction[Long, Double, Double,
 }
 
 // vertex update
-final class VertexDistanceUpdater extends VertexUpdateFunction[Long, Double, 
Double] {
+final class VertexDistanceUpdater extends GatherFunction[Long, Double, Double] 
{
 
        override def updateVertex(vertex: Vertex[Long, Double], inMessages: 
MessageIterator[Double]) = {
                var minDistance = Double.MaxValue
@@ -1211,9 +1211,9 @@ and can be specified using the `setName()` method.
 * <strong>Solution set in unmanaged memory</strong>: Defines whether the 
solution set is kept in managed memory (Flink's internal way of keeping objects 
in serialized form) or as a simple object map. By default, the solution set 
runs in managed memory. This property can be set using the 
`setSolutionSetUnmanagedMemory()` method.
 
 * <strong>Aggregators</strong>: Iteration aggregators can be registered using 
the `registerAggregator()` method. An iteration aggregator combines
-all aggregates globally once per superstep and makes them available in the 
next superstep. Registered aggregators can be accessed inside the user-defined 
`VertexUpdateFunction` and `MessagingFunction`.
+all aggregates globally once per superstep and makes them available in the 
next superstep. Registered aggregators can be accessed inside the user-defined 
`ScatterFunction` and `GatherFunction`.
 
-* <strong>Broadcast Variables</strong>: DataSets can be added as [Broadcast 
Variables]({{site.baseurl}}/apis/batch/index.html#broadcast-variables) to the 
`VertexUpdateFunction` and `MessagingFunction`, using the 
`addBroadcastSetForUpdateFunction()` and 
`addBroadcastSetForMessagingFunction()` methods, respectively.
+* <strong>Broadcast Variables</strong>: DataSets can be added as [Broadcast 
Variables]({{site.baseurl}}/apis/batch/index.html#broadcast-variables) to the 
`ScatterFunction` and `GatherFunction`, using the 
`addBroadcastSetForUpdateFunction()` and 
`addBroadcastSetForMessagingFunction()` methods, respectively.
 
 * <strong>Number of Vertices</strong>: Accessing the total number of vertices 
within the iteration. This property can be set using the `setOptNumVertices()` 
method.
 The number of vertices can then be accessed in the vertex update function and 
in the messaging function using the `getNumberOfVertices()` method. If the 
option is not set in the configuration, this method will return -1.
@@ -1245,10 +1245,12 @@ parameters.registerAggregator("sumAggregator", new 
LongSumAggregator());
 // run the scatter-gather iteration, also passing the configuration parameters
 Graph<Long, Double, Double> result =
                        graph.runScatterGatherIteration(
-                       new VertexUpdater(), new Messenger(), maxIterations, 
parameters);
+                       new Messenger(), new VertexUpdater(), maxIterations, 
parameters);
 
 // user-defined functions
-public static final class VertexUpdater extends VertexUpdateFunction {
+public static final class Messenger extends ScatterFunction {...}
+
+public static final class VertexUpdater extends GatherFunction {
 
        LongSumAggregator aggregator = new LongSumAggregator();
 
@@ -1272,8 +1274,6 @@ public static final class VertexUpdater extends 
VertexUpdateFunction {
        }
 }
 
-public static final class Messenger extends MessagingFunction {...}
-
 {% endhighlight %}
 </div>
 
@@ -1294,10 +1294,12 @@ parameters.setParallelism(16)
 parameters.registerAggregator("sumAggregator", new LongSumAggregator)
 
 // run the scatter-gather iteration, also passing the configuration parameters
-val result = graph.runScatterGatherIteration(new VertexUpdater, new Messenger, 
maxIterations, parameters)
+val result = graph.runScatterGatherIteration(new Messenger, new VertexUpdater, 
maxIterations, parameters)
 
 // user-defined functions
-final class VertexUpdater extends VertexUpdateFunction {
+final class Messenger extends ScatterFunction {...}
+
+final class VertexUpdater extends GatherFunction {
 
        var aggregator = new LongSumAggregator
 
@@ -1321,8 +1323,6 @@ final class VertexUpdater extends VertexUpdateFunction {
        }
 }
 
-final class Messenger extends MessagingFunction {...}
-
 {% endhighlight %}
 </div>
 </div>
@@ -1347,20 +1347,20 @@ parameters.setOptDegrees(true);
 // run the scatter-gather iteration, also passing the configuration parameters
 Graph<Long, Double, Double> result =
                        graph.runScatterGatherIteration(
-                       new VertexUpdater(), new Messenger(), maxIterations, 
parameters);
+                       new Messenger(), new VertexUpdater(), maxIterations, 
parameters);
 
 // user-defined functions
-public static final class VertexUpdater {
+public static final class Messenger extends ScatterFunction {
        ...
-       // get the number of vertices
-       long numVertices = getNumberOfVertices();
+       // retrieve the vertex out-degree
+       outDegree = getOutDegree();
        ...
 }
 
-public static final class Messenger {
+public static final class VertexUpdater extends GatherFunction {
        ...
-       // retrieve the vertex out-degree
-       outDegree = getOutDegree();
+       // get the number of vertices
+       long numVertices = getNumberOfVertices();
        ...
 }
 
@@ -1382,20 +1382,20 @@ parameters.setOptNumVertices(true)
 parameters.setOptDegrees(true)
 
 // run the scatter-gather iteration, also passing the configuration parameters
-val result = graph.runScatterGatherIteration(new VertexUpdater, new Messenger, 
maxIterations, parameters)
+val result = graph.runScatterGatherIteration(new Messenger, new VertexUpdater, 
maxIterations, parameters)
 
 // user-defined functions
-final class VertexUpdater {
+final class Messenger extends ScatterFunction {
        ...
-       // get the number of vertices
-       val numVertices = getNumberOfVertices
+       // retrieve the vertex out-degree
+       val outDegree = getOutDegree
        ...
 }
 
-final class Messenger {
+final class VertexUpdater extends GatherFunction {
        ...
-       // retrieve the vertex out-degree
-       val outDegree = getOutDegree
+       // get the number of vertices
+       val numVertices = getNumberOfVertices
        ...
 }
 
@@ -1419,13 +1419,13 @@ parameters.setDirection(EdgeDirection.IN);
 // run the scatter-gather iteration, also passing the configuration parameters
 DataSet<Vertex<Long, HashSet<Long>>> result =
                        graph.runScatterGatherIteration(
-                       new VertexUpdater(), new Messenger(), maxIterations, 
parameters)
+                       new Messenger(), new VertexUpdater(), maxIterations, 
parameters)
                        .getVertices();
 
 // user-defined functions
-public static final class VertexUpdater {...}
+public static final class Messenger extends GatherFunction {...}
 
-public static final class Messenger {...}
+public static final class VertexUpdater extends ScatterFunction {...}
 
 {% endhighlight %}
 </div>
@@ -1441,13 +1441,13 @@ val parameters = new ScatterGatherConfiguration
 parameters.setDirection(EdgeDirection.IN)
 
 // run the scatter-gather iteration, also passing the configuration parameters
-val result = graph.runScatterGatherIteration(new VertexUpdater, new Messenger, 
maxIterations, parameters)
+val result = graph.runScatterGatherIteration(new Messenger, new VertexUpdater, 
maxIterations, parameters)
                        .getVertices
 
 // user-defined functions
-final class VertexUpdater {...}
+final class Messenger extends ScatterFunction {...}
 
-final class Messenger {...}
+final class VertexUpdater extends GatherFunction {...}
 
 {% endhighlight %}
 </div>

http://git-wip-us.apache.org/repos/asf/flink/blob/918e5d0c/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/HITSAlgorithm.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/HITSAlgorithm.java
 
b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/HITSAlgorithm.java
index 129d2a6..ff5a2e9 100644
--- 
a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/HITSAlgorithm.java
+++ 
b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/HITSAlgorithm.java
@@ -21,17 +21,16 @@ package org.apache.flink.graph.examples;
 import org.apache.flink.api.common.aggregators.DoubleSumAggregator;
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.java.DataSet;
-
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.graph.Edge;
 import org.apache.flink.graph.EdgeDirection;
 import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.GraphAlgorithm;
 import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.spargel.GatherFunction;
 import org.apache.flink.graph.spargel.MessageIterator;
-import org.apache.flink.graph.spargel.MessagingFunction;
+import org.apache.flink.graph.spargel.ScatterFunction;
 import org.apache.flink.graph.spargel.ScatterGatherConfiguration;
-import org.apache.flink.graph.spargel.VertexUpdateFunction;
 import org.apache.flink.types.DoubleValue;
 import org.apache.flink.util.Preconditions;
 
@@ -100,17 +99,55 @@ public class HITSAlgorithm<K, VV, EV> implements 
GraphAlgorithm<K, VV, EV, DataS
                parameter.registerAggregator("diffValueSum", new 
DoubleSumAggregator());
 
                return newGraph
-                               .runScatterGatherIteration(new 
VertexUpdate<K>(maxIterations, convergeThreshold),
-                                               new 
MessageUpdate<K>(maxIterations), maxIterations, parameter)
+                               .runScatterGatherIteration(new 
MessageUpdate<K>(maxIterations),
+                                               new 
VertexUpdate<K>(maxIterations, convergeThreshold), maxIterations, parameter)
                                .getVertices();
        }
 
        /**
+        * Distributes the value of a vertex among all neighbor vertices and 
sum all the
+        * value in every superstep.
+        */
+       private static final class MessageUpdate<K> extends ScatterFunction<K, 
Tuple2<DoubleValue, DoubleValue>, Double, Boolean> {
+               private int maxIteration;
+
+               public MessageUpdate(int maxIteration) {
+                       this.maxIteration = maxIteration;
+               }
+
+               @Override
+               public void sendMessages(Vertex<K, Tuple2<DoubleValue, 
DoubleValue>> vertex) {
+                       // in the first iteration, no aggregation to call, init 
sum with value of vertex
+                       double iterationValueSum = 1.0;
+
+                       if (getSuperstepNumber() > 1) {
+                               iterationValueSum = Math.sqrt(((DoubleValue) 
getPreviousIterationAggregate("updatedValueSum")).getValue());
+                       }
+                       for (Edge<K, Boolean> edge : getEdges()) {
+                               if (getSuperstepNumber() != maxIteration) {
+                                       if (getSuperstepNumber() % 2 == 1) {
+                                               if (edge.getValue()) {
+                                                       
sendMessageTo(edge.getTarget(), vertex.getValue().f0.getValue() / 
iterationValueSum);
+                                               }
+                                       } else {
+                                               if (!edge.getValue()) {
+                                                       
sendMessageTo(edge.getTarget(), vertex.getValue().f1.getValue() / 
iterationValueSum);
+                                               }
+                                       }
+                               } else {
+                                       if (!edge.getValue()) {
+                                               sendMessageTo(edge.getTarget(), 
iterationValueSum);
+                                       }
+                               }
+                       }
+               }
+       }
+
+       /**
         * Function that updates the value of a vertex by summing up the partial
         * values from all messages and normalize the value.
         */
-       @SuppressWarnings("serial")
-       public static final class VertexUpdate<K> extends 
VertexUpdateFunction<K, Tuple2<DoubleValue, DoubleValue>, Double> {
+       private static final class VertexUpdate<K> extends GatherFunction<K, 
Tuple2<DoubleValue, DoubleValue>, Double> {
                private int maxIteration;
                private double convergeThreshold;
                private DoubleSumAggregator updatedValueSumAggregator;
@@ -162,7 +199,7 @@ public class HITSAlgorithm<K, VV, EV> implements 
GraphAlgorithm<K, VV, EV, DataS
                                                diffValueSum = ((DoubleValue) 
getPreviousIterationAggregate("diffValueSum")).getValue();
                                        }
                                        
authoritySumAggregator.aggregate(previousAuthAverage);
-                                       
+
                                        if (diffValueSum > convergeThreshold) {
                                                
newHubValue.setValue(newHubValue.getValue() / iterationValueSum);
                                                
newAuthorityValue.setValue(updateValue);
@@ -191,77 +228,24 @@ public class HITSAlgorithm<K, VV, EV> implements 
GraphAlgorithm<K, VV, EV, DataS
                }
        }
 
-       /**
-        * Distributes the value of a vertex among all neighbor vertices and 
sum all the
-        * value in every superstep.
-        */
-       @SuppressWarnings("serial")
-       public static final class MessageUpdate<K> extends MessagingFunction<K, 
Tuple2<DoubleValue, DoubleValue>, Double, Boolean> {
-               private int maxIteration;
-
-               public MessageUpdate(int maxIteration) {
-                       this.maxIteration = maxIteration;
-               }
-
-               @Override
-               public void sendMessages(Vertex<K, Tuple2<DoubleValue, 
DoubleValue>> vertex) {
-
-                       // in the first iteration, no aggregation to call, init 
sum with value of vertex
-                       double iterationValueSum = 1.0;
-
-                       if (getSuperstepNumber() > 1) {
-                               iterationValueSum = Math.sqrt(((DoubleValue) 
getPreviousIterationAggregate("updatedValueSum")).getValue());
-                       }
-                       for (Edge<K, Boolean> edge : getEdges()) {
-                               if (getSuperstepNumber() != maxIteration) {
-                                       if (getSuperstepNumber() % 2 == 1) {
-                                               if (edge.getValue()) {
-                                                       
sendMessageTo(edge.getTarget(), vertex.getValue().f0.getValue() / 
iterationValueSum);
-                                               }
-                                       } else {
-                                               if (!edge.getValue()) {
-                                                       
sendMessageTo(edge.getTarget(), vertex.getValue().f1.getValue() / 
iterationValueSum);
-                                               }
-                                       }
-                               } else {
-                                       if (!edge.getValue()) {
-                                               sendMessageTo(edge.getTarget(), 
iterationValueSum);
-                                       }
-                               }
-                       }
-               }
-       }
-
-       public static class VertexInitMapper<K, VV> implements 
MapFunction<Vertex<K, VV>, Tuple2<DoubleValue, DoubleValue>> {
-
-               private static final long serialVersionUID = 1L;
-
+       private static class VertexInitMapper<K, VV> implements 
MapFunction<Vertex<K, VV>, Tuple2<DoubleValue, DoubleValue>> {
                private Tuple2<DoubleValue, DoubleValue> initVertexValue = new 
Tuple2<>(new DoubleValue(1.0), new DoubleValue(1.0));
 
                public Tuple2<DoubleValue, DoubleValue> map(Vertex<K, VV> 
value) {
-
                        //init hub and authority value of each vertex
                        return initVertexValue;
                }
        }
 
-       public static class AuthorityEdgeMapper<K, EV> implements 
MapFunction<Edge<K, EV>, Boolean> {
-
-               private static final long serialVersionUID = 1L;
-
+       private static class AuthorityEdgeMapper<K, EV> implements 
MapFunction<Edge<K, EV>, Boolean> {
                public Boolean map(Edge<K, EV> edge) {
-                       
                        // mark edge as true for authority updating
                        return true;
                }
        }
 
-       public static class HubEdgeMapper<K, EV> implements MapFunction<Edge<K, 
EV>, Boolean> {
-
-               private static final long serialVersionUID = 1L;
-
+       private static class HubEdgeMapper<K, EV> implements 
MapFunction<Edge<K, EV>, Boolean> {
                public Boolean map(Edge<K, EV> edge) {
-                       
                        // mark edge as false for hub updating
                        return false;
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/918e5d0c/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/IncrementalSSSP.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/IncrementalSSSP.java
 
b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/IncrementalSSSP.java
index 26e419f..631384c 100644
--- 
a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/IncrementalSSSP.java
+++ 
b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/IncrementalSSSP.java
@@ -27,10 +27,10 @@ import org.apache.flink.graph.EdgeDirection;
 import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.Vertex;
 import org.apache.flink.graph.examples.data.IncrementalSSSPData;
+import org.apache.flink.graph.spargel.GatherFunction;
 import org.apache.flink.graph.spargel.MessageIterator;
-import org.apache.flink.graph.spargel.MessagingFunction;
+import org.apache.flink.graph.spargel.ScatterFunction;
 import org.apache.flink.graph.spargel.ScatterGatherConfiguration;
-import org.apache.flink.graph.spargel.VertexUpdateFunction;
 
 /**
  * This example illustrates how to 
@@ -97,8 +97,8 @@ public class IncrementalSSSP implements ProgramDescription {
                        parameters.setOptDegrees(true);
 
                        // run the scatter-gather iteration to propagate info
-                       Graph<Long, Double, Double> result = 
ssspGraph.runScatterGatherIteration(new VertexDistanceUpdater(),
-                                       new 
InvalidateMessenger(edgeToBeRemoved), maxIterations, parameters);
+                       Graph<Long, Double, Double> result = 
ssspGraph.runScatterGatherIteration(new InvalidateMessenger(edgeToBeRemoved),
+                                       new VertexDistanceUpdater(), 
maxIterations, parameters);
 
                        DataSet<Vertex<Long, Double>> resultedVertices = 
result.getVertices();
 
@@ -147,22 +147,7 @@ public class IncrementalSSSP implements ProgramDescription 
{
                }).count() > 0;
        }
 
-       public static final class VertexDistanceUpdater extends 
VertexUpdateFunction<Long, Double, Double> {
-
-               @Override
-               public void updateVertex(Vertex<Long, Double> vertex, 
MessageIterator<Double> inMessages) throws Exception {
-                       if (inMessages.hasNext()) {
-                               Long outDegree = getOutDegree() - 1;
-                               // check if the vertex has another SP-Edge
-                               if (outDegree <= 0) {
-                                       // set own value to infinity
-                                       setNewVertexValue(Double.MAX_VALUE);
-                               }
-                       }
-               }
-       }
-
-       public static final class InvalidateMessenger extends 
MessagingFunction<Long, Double, Double, Double> {
+       public static final class InvalidateMessenger extends 
ScatterFunction<Long, Double, Double, Double> {
 
                private Edge<Long, Double> edgeToBeRemoved;
 
@@ -190,6 +175,21 @@ public class IncrementalSSSP implements ProgramDescription 
{
                }
        }
 
+       public static final class VertexDistanceUpdater extends 
GatherFunction<Long, Double, Double> {
+
+               @Override
+               public void updateVertex(Vertex<Long, Double> vertex, 
MessageIterator<Double> inMessages) throws Exception {
+                       if (inMessages.hasNext()) {
+                               Long outDegree = getOutDegree() - 1;
+                               // check if the vertex has another SP-Edge
+                               if (outDegree <= 0) {
+                                       // set own value to infinity
+                                       setNewVertexValue(Double.MAX_VALUE);
+                               }
+                       }
+               }
+       }
+
        // 
******************************************************************************************************************
        // UTIL METHODS
        // 
******************************************************************************************************************

http://git-wip-us.apache.org/repos/asf/flink/blob/918e5d0c/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/SingleSourceShortestPaths.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/SingleSourceShortestPaths.java
 
b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/SingleSourceShortestPaths.java
index c9abf02..68d20e0 100644
--- 
a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/SingleSourceShortestPaths.java
+++ 
b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/SingleSourceShortestPaths.java
@@ -26,9 +26,9 @@ import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.graph.Edge;
 import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.spargel.GatherFunction;
 import org.apache.flink.graph.spargel.MessageIterator;
-import org.apache.flink.graph.spargel.MessagingFunction;
-import org.apache.flink.graph.spargel.VertexUpdateFunction;
+import org.apache.flink.graph.spargel.ScatterFunction;
 import org.apache.flink.graph.utils.Tuple3ToEdgeMap;
 
 /**
@@ -62,7 +62,7 @@ public class SingleSourceShortestPaths implements 
ProgramDescription {
 
                // Execute the scatter-gather iteration
                Graph<Long, Double, Double> result = 
graph.runScatterGatherIteration(
-                               new VertexDistanceUpdater(), new 
MinDistanceMessenger(), maxIterations);
+                               new MinDistanceMessenger(), new 
VertexDistanceUpdater(), maxIterations);
 
                // Extract the vertices as the result
                DataSet<Vertex<Long, Double>> singleSourceShortestPaths = 
result.getVertices();
@@ -103,11 +103,28 @@ public class SingleSourceShortestPaths implements 
ProgramDescription {
        }
 
        /**
+        * Distributes the minimum distance associated with a given vertex 
among all
+        * the target vertices summed up with the edge's value.
+        */
+       @SuppressWarnings("serial")
+       private static final class MinDistanceMessenger extends 
ScatterFunction<Long, Double, Double, Double> {
+
+               @Override
+               public void sendMessages(Vertex<Long, Double> vertex) {
+                       if (vertex.getValue() < Double.POSITIVE_INFINITY) {
+                               for (Edge<Long, Double> edge : getEdges()) {
+                                       sendMessageTo(edge.getTarget(), 
vertex.getValue() + edge.getValue());
+                               }
+                       }
+               }
+       }
+
+       /**
         * Function that updates the value of a vertex by picking the minimum
         * distance from all incoming messages.
         */
        @SuppressWarnings("serial")
-       public static final class VertexDistanceUpdater extends 
VertexUpdateFunction<Long, Double, Double> {
+       private static final class VertexDistanceUpdater extends 
GatherFunction<Long, Double, Double> {
 
                @Override
                public void updateVertex(Vertex<Long, Double> vertex, 
MessageIterator<Double> inMessages) {
@@ -126,23 +143,6 @@ public class SingleSourceShortestPaths implements 
ProgramDescription {
                }
        }
 
-       /**
-        * Distributes the minimum distance associated with a given vertex 
among all
-        * the target vertices summed up with the edge's value.
-        */
-       @SuppressWarnings("serial")
-       public static final class MinDistanceMessenger extends 
MessagingFunction<Long, Double, Double, Double> {
-
-               @Override
-               public void sendMessages(Vertex<Long, Double> vertex) {
-                       if (vertex.getValue() < Double.POSITIVE_INFINITY) {
-                               for (Edge<Long, Double> edge : getEdges()) {
-                                       sendMessageTo(edge.getTarget(), 
vertex.getValue() + edge.getValue());
-                               }
-                       }
-               }
-       }
-
        // 
******************************************************************************************************************
        // UTIL METHODS
        // 
******************************************************************************************************************

http://git-wip-us.apache.org/repos/asf/flink/blob/918e5d0c/flink-libraries/flink-gelly-examples/src/main/scala/org/apache/flink/graph/scala/examples/SingleSourceShortestPaths.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly-examples/src/main/scala/org/apache/flink/graph/scala/examples/SingleSourceShortestPaths.scala
 
b/flink-libraries/flink-gelly-examples/src/main/scala/org/apache/flink/graph/scala/examples/SingleSourceShortestPaths.scala
index 4f84bb0..2d623e7 100644
--- 
a/flink-libraries/flink-gelly-examples/src/main/scala/org/apache/flink/graph/scala/examples/SingleSourceShortestPaths.scala
+++ 
b/flink-libraries/flink-gelly-examples/src/main/scala/org/apache/flink/graph/scala/examples/SingleSourceShortestPaths.scala
@@ -22,11 +22,10 @@ import org.apache.flink.api.scala._
 import org.apache.flink.graph.scala._
 import org.apache.flink.graph.Edge
 import org.apache.flink.api.common.functions.MapFunction
-import org.apache.flink.graph.spargel.VertexUpdateFunction
-import org.apache.flink.graph.spargel.MessageIterator
+import org.apache.flink.graph.spargel.{MessageIterator, ScatterFunction, 
GatherFunction}
 import org.apache.flink.graph.Vertex
-import org.apache.flink.graph.spargel.MessagingFunction
 import org.apache.flink.graph.examples.data.SingleSourceShortestPathsData
+
 import scala.collection.JavaConversions._
 import org.apache.flink.graph.scala.utils.Tuple3ToEdgeMap
 
@@ -55,8 +54,8 @@ object SingleSourceShortestPaths {
     val graph = Graph.fromDataSet[Long, Double, Double](edges, new 
InitVertices(srcVertexId), env)
 
     // Execute the scatter-gather iteration
-    val result = graph.runScatterGatherIteration(new VertexDistanceUpdater,
-      new MinDistanceMessenger, maxIterations)
+    val result = graph.runScatterGatherIteration(new MinDistanceMessenger,
+      new VertexDistanceUpdater, maxIterations)
 
     // Extract the vertices as the result
     val singleSourceShortestPaths = result.getVertices
@@ -86,10 +85,26 @@ object SingleSourceShortestPaths {
   }
 
   /**
-   * Function that updates the value of a vertex by picking the minimum
-   * distance from all incoming messages.
+   * Distributes the minimum distance associated with a given vertex among all
+   * the target vertices summed up with the edge's value.
    */
-  private final class VertexDistanceUpdater extends VertexUpdateFunction[Long, 
Double, Double] {
+  private final class MinDistanceMessenger extends
+    ScatterFunction[Long, Double, Double, Double] {
+
+    override def sendMessages(vertex: Vertex[Long, Double]) {
+      if (vertex.getValue < Double.PositiveInfinity) {
+        for (edge: Edge[Long, Double] <- getEdges) {
+          sendMessageTo(edge.getTarget, vertex.getValue + edge.getValue)
+        }
+      }
+    }
+  }
+
+  /**
+    * Function that updates the value of a vertex by picking the minimum
+    * distance from all incoming messages.
+    */
+  private final class VertexDistanceUpdater extends GatherFunction[Long, 
Double, Double] {
 
     override def updateVertex(vertex: Vertex[Long, Double], inMessages: 
MessageIterator[Double]) {
       var minDistance = Double.MaxValue
@@ -105,22 +120,6 @@ object SingleSourceShortestPaths {
     }
   }
 
-  /**
-   * Distributes the minimum distance associated with a given vertex among all
-   * the target vertices summed up with the edge's value.
-   */
-  private final class MinDistanceMessenger extends
-    MessagingFunction[Long, Double, Double, Double] {
-
-    override def sendMessages(vertex: Vertex[Long, Double]) {
-      if (vertex.getValue < Double.PositiveInfinity) {
-        for (edge: Edge[Long, Double] <- getEdges) {
-          sendMessageTo(edge.getTarget, vertex.getValue + edge.getValue)
-        }
-      }
-    }
-  }
-
   // 
****************************************************************************
   // UTIL METHODS
   // 
****************************************************************************

http://git-wip-us.apache.org/repos/asf/flink/blob/918e5d0c/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/test/examples/IncrementalSSSPITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/test/examples/IncrementalSSSPITCase.java
 
b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/test/examples/IncrementalSSSPITCase.java
index d27dcd8..24b8cf1 100644
--- 
a/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/test/examples/IncrementalSSSPITCase.java
+++ 
b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/test/examples/IncrementalSSSPITCase.java
@@ -111,8 +111,8 @@ public class IncrementalSSSPITCase extends 
MultipleProgramsTestBase {
 
                        // run the scatter gather iteration to propagate info
                        Graph<Long, Double, Double> result = 
ssspGraph.runScatterGatherIteration(
-                                       new 
IncrementalSSSP.VertexDistanceUpdater(),
                                        new 
IncrementalSSSP.InvalidateMessenger(edgeToBeRemoved),
+                                       new 
IncrementalSSSP.VertexDistanceUpdater(),
                                        IncrementalSSSPData.NUM_VERTICES, 
parameters);
 
                        DataSet<Vertex<Long, Double>> resultedVertices = 
result.getVertices();

http://git-wip-us.apache.org/repos/asf/flink/blob/918e5d0c/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/Graph.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/Graph.scala
 
b/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/Graph.scala
index f7e13ba..4dd9d12 100644
--- 
a/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/Graph.scala
+++ 
b/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/Graph.scala
@@ -24,9 +24,9 @@ import org.apache.flink.api.java.{tuple => jtuple}
 import org.apache.flink.api.scala._
 import org.apache.flink.graph._
 import org.apache.flink.graph.asm.translate.TranslateFunction
-import org.apache.flink.graph.gsa.{ApplyFunction, GSAConfiguration, 
GatherFunction, SumFunction}
+import org.apache.flink.graph.gsa.{ApplyFunction, GSAConfiguration, 
SumFunction, GatherFunction => GSAGatherFunction}
 import org.apache.flink.graph.pregel.{ComputeFunction, MessageCombiner, 
VertexCentricConfiguration}
-import org.apache.flink.graph.spargel.{MessagingFunction, 
ScatterGatherConfiguration, VertexUpdateFunction}
+import org.apache.flink.graph.spargel.{ScatterFunction, 
ScatterGatherConfiguration, GatherFunction => SpargelGatherFunction}
 import org.apache.flink.graph.validation.GraphValidator
 import org.apache.flink.types.{LongValue, NullValue}
 import org.apache.flink.util.Preconditions
@@ -38,8 +38,8 @@ import _root_.scala.reflect.ClassTag
 object Graph {
 
   /**
-  * Creates a Graph from a DataSet of vertices and a DataSet of edges.
-  */
+   * Creates a Graph from a DataSet of vertices and a DataSet of edges.
+   */
   def fromDataSet[K: TypeInformation : ClassTag, VV: TypeInformation : 
ClassTag, EV:
   TypeInformation : ClassTag](vertices: DataSet[Vertex[K, VV]], edges: 
DataSet[Edge[K, EV]],
                               env: ExecutionEnvironment): Graph[K, VV, EV] = {
@@ -47,19 +47,19 @@ object Graph {
   }
 
   /**
-  * Creates a Graph from a DataSet of edges.
-  * Vertices are created automatically and their values are set to NullValue.
-  */
+   * Creates a Graph from a DataSet of edges.
+   * Vertices are created automatically and their values are set to NullValue.
+   */
   def fromDataSet[K: TypeInformation : ClassTag, EV: TypeInformation : 
ClassTag]
   (edges: DataSet[Edge[K, EV]], env: ExecutionEnvironment): Graph[K, 
NullValue, EV] = {
     wrapGraph(jg.Graph.fromDataSet[K, EV](edges.javaSet, env.getJavaEnv))
   }
 
   /**
-  * Creates a graph from a DataSet of edges.
-  * Vertices are created automatically and their values are set by applying 
the provided
-  * vertexValueInitializer map function to the vertex ids.
-  */
+   * Creates a graph from a DataSet of edges.
+   * Vertices are created automatically and their values are set by applying 
the provided
+   * vertexValueInitializer map function to the vertex ids.
+   */
   def fromDataSet[K: TypeInformation : ClassTag, VV: TypeInformation : 
ClassTag, EV:
   TypeInformation : ClassTag](edges: DataSet[Edge[K, EV]],
   vertexValueInitializer: MapFunction[K, VV], env: ExecutionEnvironment): 
Graph[K, VV, EV] = {
@@ -68,8 +68,8 @@ object Graph {
   }
 
   /**
-  * Creates a Graph from a Seq of vertices and a Seq of edges.
-  */
+   * Creates a Graph from a Seq of vertices and a Seq of edges.
+   */
   def fromCollection[K: TypeInformation : ClassTag, VV: TypeInformation : 
ClassTag, EV:
   TypeInformation : ClassTag](vertices: Seq[Vertex[K, VV]], edges: Seq[Edge[K, 
EV]], env:
   ExecutionEnvironment): Graph[K, VV, EV] = {
@@ -78,19 +78,19 @@ object Graph {
   }
 
   /**
-  * Creates a Graph from a Seq of edges.
-  * Vertices are created automatically and their values are set to NullValue.
-  */
+   * Creates a Graph from a Seq of edges.
+   * Vertices are created automatically and their values are set to NullValue.
+   */
   def fromCollection[K: TypeInformation : ClassTag, EV: TypeInformation : 
ClassTag]
   (edges: Seq[Edge[K, EV]], env: ExecutionEnvironment): Graph[K, NullValue, 
EV] = {
     wrapGraph(jg.Graph.fromCollection[K, EV](edges.asJavaCollection, 
env.getJavaEnv))
   }
 
   /**
-  * Creates a graph from a Seq of edges.
-  * Vertices are created automatically and their values are set by applying 
the provided
-  * vertexValueInitializer map function to the vertex ids.
-  */
+   * Creates a graph from a Seq of edges.
+   * Vertices are created automatically and their values are set by applying 
the provided
+   * vertexValueInitializer map function to the vertex ids.
+   */
   def fromCollection[K: TypeInformation : ClassTag, VV: TypeInformation : 
ClassTag, EV:
   TypeInformation : ClassTag](edges: Seq[Edge[K, EV]], vertexValueInitializer: 
MapFunction[K, VV],
   env: ExecutionEnvironment): Graph[K, VV, EV] = {
@@ -105,7 +105,7 @@ object Graph {
    * The first field of the Tuple3 object for edges will become the source ID,
    * the second field will become the target ID, and the third field will 
become
    * the edge value. 
-  */
+   */
   def fromTupleDataSet[K: TypeInformation : ClassTag, VV: TypeInformation : 
ClassTag, EV:
   TypeInformation : ClassTag](vertices: DataSet[(K, VV)], edges: DataSet[(K, 
K, EV)],
                               env: ExecutionEnvironment): Graph[K, VV, EV] = {
@@ -116,12 +116,12 @@ object Graph {
   }
 
   /**
-  * Creates a Graph from a DataSet of Tuples representing the edges.
-  * The first field of the Tuple3 object for edges will become the source ID,
-  * the second field will become the target ID, and the third field will become
-  * the edge value. 
-  * Vertices are created automatically and their values are set to NullValue.
-  */
+   * Creates a Graph from a DataSet of Tuples representing the edges.
+   * The first field of the Tuple3 object for edges will become the source ID,
+   * the second field will become the target ID, and the third field will 
become
+   * the edge value.
+   * Vertices are created automatically and their values are set to NullValue.
+   */
   def fromTupleDataSet[K: TypeInformation : ClassTag, EV: TypeInformation : 
ClassTag]
   (edges: DataSet[(K, K, EV)], env: ExecutionEnvironment): Graph[K, NullValue, 
EV] = {
     val javaTupleEdges = edges.map(v => new jtuple.Tuple3(v._1, v._2, 
v._3)).javaSet
@@ -129,13 +129,13 @@ object Graph {
   }
 
   /**
-  * Creates a Graph from a DataSet of Tuples representing the edges.
-  * The first field of the Tuple3 object for edges will become the source ID,
-  * the second field will become the target ID, and the third field will become
-  * the edge value. 
-  * Vertices are created automatically and their values are set by applying 
the provided
-  * vertexValueInitializer map function to the vertex ids.
-  */
+   * Creates a Graph from a DataSet of Tuples representing the edges.
+   * The first field of the Tuple3 object for edges will become the source ID,
+   * the second field will become the target ID, and the third field will 
become
+   * the edge value.
+   * Vertices are created automatically and their values are set by applying 
the provided
+   * vertexValueInitializer map function to the vertex ids.
+   */
   def fromTupleDataSet[K: TypeInformation : ClassTag, VV: TypeInformation : 
ClassTag, EV:
   TypeInformation : ClassTag](edges: DataSet[(K, K, EV)],
   vertexValueInitializer: MapFunction[K, VV], env: ExecutionEnvironment): 
Graph[K, VV, EV] = {
@@ -144,12 +144,12 @@ object Graph {
       env.getJavaEnv))
   }
 
-    /**
-  * Creates a Graph from a DataSet of Tuple2's representing the edges.
-  * The first field of the Tuple2 object for edges will become the source ID,
-  * the second field will become the target ID. The edge value will be set to 
NullValue.
-  * Vertices are created automatically and their values are set to NullValue.
-  */
+  /**
+   * Creates a Graph from a DataSet of Tuple2's representing the edges.
+   * The first field of the Tuple2 object for edges will become the source ID,
+   * the second field will become the target ID. The edge value will be set to 
NullValue.
+   * Vertices are created automatically and their values are set to NullValue.
+   */
   def fromTuple2DataSet[K: TypeInformation : ClassTag](edges: DataSet[(K, K)],
   env: ExecutionEnvironment): Graph[K, NullValue, NullValue] = {
     val javaTupleEdges = edges.map(v => new jtuple.Tuple2(v._1, v._2)).javaSet
@@ -157,12 +157,12 @@ object Graph {
   }
 
   /**
-  * Creates a Graph from a DataSet of Tuple2's representing the edges.
-  * The first field of the Tuple2 object for edges will become the source ID,
-  * the second field will become the target ID. The edge value will be set to 
NullValue.
-  * Vertices are created automatically and their values are set by applying 
the provided
-  * vertexValueInitializer map function to the vertex IDs.
-  */
+   * Creates a Graph from a DataSet of Tuple2's representing the edges.
+   * The first field of the Tuple2 object for edges will become the source ID,
+   * the second field will become the target ID. The edge value will be set to 
NullValue.
+   * Vertices are created automatically and their values are set by applying 
the provided
+   * vertexValueInitializer map function to the vertex IDs.
+   */
   def fromTuple2DataSet[K: TypeInformation : ClassTag, VV: TypeInformation : 
ClassTag]
   (edges: DataSet[(K, K)], vertexValueInitializer: MapFunction[K, VV],
   env: ExecutionEnvironment): Graph[K, VV, NullValue] = {
@@ -172,52 +172,52 @@ object Graph {
   }
 
   /** Creates a Graph from a CSV file of edges.
-    *
-    * The edge value is read from the CSV file if [[EV]] is not of type 
[[NullValue]]. Otherwise the
-    * edge value is set to [[NullValue]].
-    *
-    * If the vertex value type [[VV]] is specified (unequal [[NullValue]]), 
then the vertex values
-    * are read from the file specified by pathVertices. If the path has not 
been specified then the
-    * vertexValueInitializer is used to initialize the vertex values of the 
vertices extracted from
-    * the set of edges. If the vertexValueInitializer has not been set either, 
then the method
-    * fails.
-    *
-    * @param env The Execution Environment.
-    * @param pathEdges The file path containing the edges.
-    * @param pathVertices The file path containing the vertices.
-    * @param lineDelimiterVertices The string that separates lines in the 
vertices file. It defaults
-    *                              to newline.
-    * @param fieldDelimiterVertices The string that separates vertex Ids from 
vertex values in the
-    *                               vertices file.
-    * @param quoteCharacterVertices The character to use for quoted String 
parsing in the vertices
-    *                               file. Disabled by default.
-    * @param ignoreFirstLineVertices Whether the first line in the vertices 
file should be ignored.
-    * @param ignoreCommentsVertices Lines that start with the given String in 
the vertices file
-    *                               are ignored, disabled by default.
-    * @param lenientVertices Whether the parser should silently ignore 
malformed lines in the
-    *                        vertices file.
-    * @param includedFieldsVertices The fields in the vertices file that 
should be read. By default
-    *                               all fields are read.
-    * @param lineDelimiterEdges The string that separates lines in the edges 
file. It defaults to
-    *                           newline.
-    * @param fieldDelimiterEdges The string that separates fields in the edges 
file.
-    * @param quoteCharacterEdges The character to use for quoted String 
parsing in the edges file.
-    *                            Disabled by default.
-    * @param ignoreFirstLineEdges Whether the first line in the vertices file 
should be ignored.
-    * @param ignoreCommentsEdges Lines that start with the given String in the 
edges file are
-    *                            ignored, disabled by default.
-    * @param lenientEdges Whether the parser should silently ignore malformed 
lines in the edges
-    *                     file.
-    * @param includedFieldsEdges The fields in the edges file that should be 
read. By default all
-    *                            fields are read.
-    * @param vertexValueInitializer  If no vertex values are provided, this 
mapper can be used to
-    *                                initialize them, by applying a map 
transformation on the vertex
-    *                                IDs.
-    * @tparam K Vertex key type
-    * @tparam VV Vertex value type
-    * @tparam EV Edge value type
-    * @return Graph with vertices and edges read from the given files.
-    */
+   *
+   * The edge value is read from the CSV file if [[EV]] is not of type 
[[NullValue]]. Otherwise the
+   * edge value is set to [[NullValue]].
+   *
+   * If the vertex value type [[VV]] is specified (unequal [[NullValue]]), 
then the vertex values
+   * are read from the file specified by pathVertices. If the path has not 
been specified then the
+   * vertexValueInitializer is used to initialize the vertex values of the 
vertices extracted from
+   * the set of edges. If the vertexValueInitializer has not been set either, 
then the method
+   * fails.
+   *
+   * @param env The Execution Environment.
+   * @param pathEdges The file path containing the edges.
+   * @param pathVertices The file path containing the vertices.
+   * @param lineDelimiterVertices The string that separates lines in the 
vertices file. It defaults
+   *                              to newline.
+   * @param fieldDelimiterVertices The string that separates vertex Ids from 
vertex values in the
+   *                               vertices file.
+   * @param quoteCharacterVertices The character to use for quoted String 
parsing in the vertices
+   *                               file. Disabled by default.
+   * @param ignoreFirstLineVertices Whether the first line in the vertices 
file should be ignored.
+   * @param ignoreCommentsVertices Lines that start with the given String in 
the vertices file
+   *                               are ignored, disabled by default.
+   * @param lenientVertices Whether the parser should silently ignore 
malformed lines in the
+   *                        vertices file.
+   * @param includedFieldsVertices The fields in the vertices file that should 
be read. By default
+   *                               all fields are read.
+   * @param lineDelimiterEdges The string that separates lines in the edges 
file. It defaults to
+   *                           newline.
+   * @param fieldDelimiterEdges The string that separates fields in the edges 
file.
+   * @param quoteCharacterEdges The character to use for quoted String parsing 
in the edges file.
+   *                            Disabled by default.
+   * @param ignoreFirstLineEdges Whether the first line in the vertices file 
should be ignored.
+   * @param ignoreCommentsEdges Lines that start with the given String in the 
edges file are
+   *                            ignored, disabled by default.
+   * @param lenientEdges Whether the parser should silently ignore malformed 
lines in the edges
+   *                     file.
+   * @param includedFieldsEdges The fields in the edges file that should be 
read. By default all
+   *                            fields are read.
+   * @param vertexValueInitializer  If no vertex values are provided, this 
mapper can be used to
+   *                                initialize them, by applying a map 
transformation on the vertex
+   *                                IDs.
+   * @tparam K Vertex key type
+   * @tparam VV Vertex value type
+   * @tparam EV Edge value type
+   * @return Graph with vertices and edges read from the given files.
+   */
   // scalastyle:off
   // This method exceeds the max allowed number of parameters -->  
   def fromCsvReader[
@@ -295,6 +295,7 @@ object Graph {
 
 /**
  * Represents a graph consisting of {@link Edge edges} and {@link Vertex 
vertices}.
+ *
  * @param jgraph the underlying java api Graph.
  * @tparam K the key type for vertex and edge identifiers
  * @tparam VV the value type for vertices
@@ -341,9 +342,9 @@ TypeInformation : ClassTag](jgraph: jg.Graph[K, VV, EV]) {
   }
 
   /**
-  * @return a DataSet of Triplets,
-  * consisting of (srcVertexId, trgVertexId, srcVertexValue, trgVertexValue, 
edgeValue)
-  */
+   * @return a DataSet of Triplets,
+   * consisting of (srcVertexId, trgVertexId, srcVertexValue, trgVertexValue, 
edgeValue)
+   */
   def getTriplets(): DataSet[Triplet[K, VV, EV]] = {
     wrap(jgraph.getTriplets())
   }
@@ -418,11 +419,11 @@ TypeInformation : ClassTag](jgraph: jg.Graph[K, VV, EV]) {
   }
 
   /**
-    * Translate vertex and edge IDs using the given function.
-    *
-    * @param fun implements conversion from K to NEW
-    * @return graph with translated vertex and edge IDs
-    */
+   * Translate vertex and edge IDs using the given function.
+   *
+   * @param fun implements conversion from K to NEW
+   * @return graph with translated vertex and edge IDs
+   */
   def translateGraphIds[NEW: TypeInformation : ClassTag](fun: (K, NEW) => NEW):
   Graph[NEW, VV, EV] = {
     val translator: TranslateFunction[K, NEW] = new TranslateFunction[K, NEW] {
@@ -446,11 +447,11 @@ TypeInformation : ClassTag](jgraph: jg.Graph[K, VV, EV]) {
   }
 
   /**
-    * Translate vertex values using the given function.
-    *
-    * @param fun implements conversion from VV to NEW
-    * @return graph with translated vertex values
-    */
+   * Translate vertex values using the given function.
+   *
+   * @param fun implements conversion from VV to NEW
+   * @return graph with translated vertex values
+   */
   def translateVertexValues[NEW: TypeInformation : ClassTag](fun: (VV, NEW) => 
NEW):
   Graph[K, NEW, EV] = {
     val translator: TranslateFunction[VV, NEW] = new TranslateFunction[VV, 
NEW] {
@@ -474,11 +475,11 @@ TypeInformation : ClassTag](jgraph: jg.Graph[K, VV, EV]) {
   }
 
   /**
-    * Translate edge values using the given function.
-    *
-    * @param fun implements conversion from EV to NEW
-    * @return graph with translated edge values
-    */
+   * Translate edge values using the given function.
+   *
+   * @param fun implements conversion from EV to NEW
+   * @return graph with translated edge values
+   */
   def translateEdgeValues[NEW: TypeInformation : ClassTag](fun: (EV, NEW) => 
NEW):
   Graph[K, VV, NEW] = {
     val translator: TranslateFunction[EV, NEW] = new TranslateFunction[EV, 
NEW] {
@@ -503,9 +504,8 @@ TypeInformation : ClassTag](jgraph: jg.Graph[K, VV, EV]) {
    * of the matched Tuple2 from the input DataSet.
    * @return a new Graph, where the vertex values have been updated according 
to the
    * result of the vertexJoinFunction.
-   * 
    * @tparam T the type of the second field of the input Tuple2 DataSet.
-  */
+   */
   def joinWithVertices[T: TypeInformation](inputDataSet: DataSet[(K, T)],
   vertexJoinFunction: VertexJoinFunction[VV, T]): Graph[K, VV, EV] = {
     val javaTupleSet = inputDataSet.map(scalatuple => new 
jtuple.Tuple2(scalatuple._1,
@@ -526,9 +526,8 @@ TypeInformation : ClassTag](jgraph: jg.Graph[K, VV, EV]) {
    * of the matched Tuple2 from the input DataSet.
    * @return a new Graph, where the vertex values have been updated according 
to the
    * result of the vertexJoinFunction.
-   * 
    * @tparam T the type of the second field of the input Tuple2 DataSet.
-  */
+   */
   def joinWithVertices[T: TypeInformation](inputDataSet: DataSet[(K, T)], fun: 
(VV, T) => VV):
   Graph[K, VV, EV] = {
     val newVertexJoin = new VertexJoinFunction[VV, T]() {
@@ -554,11 +553,10 @@ TypeInformation : ClassTag](jgraph: jg.Graph[K, VV, EV]) {
    * @param edgeJoinFunction the transformation function to apply.
    * The first parameter is the current edge value and the second parameter is 
the value
    * of the matched Tuple3 from the input DataSet.
-   * 
    * @tparam T the type of the third field of the input Tuple3 DataSet.
    * @return a new Graph, where the edge values have been updated according to 
the
    * result of the edgeJoinFunction.
-  */
+   */
   def joinWithEdges[T: TypeInformation](inputDataSet: DataSet[(K, K, T)],
   edgeJoinFunction: EdgeJoinFunction[EV, T]): Graph[K, VV, EV] = {
     val javaTupleSet = inputDataSet.map(scalatuple => new 
jtuple.Tuple3(scalatuple._1,
@@ -577,11 +575,10 @@ TypeInformation : ClassTag](jgraph: jg.Graph[K, VV, EV]) {
    * @param fun the transformation function to apply.
    * The first parameter is the current edge value and the second parameter is 
the value
    * of the matched Tuple3 from the input DataSet.
-   * 
    * @tparam T the type of the third field of the input Tuple3 DataSet.
    * @return a new Graph, where the edge values have been updated according to 
the
    * result of the edgeJoinFunction.
-  */
+   */
   def joinWithEdges[T: TypeInformation](inputDataSet: DataSet[(K, K, T)], fun: 
(EV, T) => EV):
   Graph[K, VV, EV] = {
     val newEdgeJoin = new EdgeJoinFunction[EV, T]() {
@@ -611,7 +608,7 @@ TypeInformation : ClassTag](jgraph: jg.Graph[K, VV, EV]) {
    * @tparam T the type of the second field of the input Tuple2 DataSet.
    * @return a new Graph, where the edge values have been updated according to 
the
    * result of the edgeJoinFunction.
-  */
+   */
   def joinWithEdgesOnSource[T: TypeInformation](inputDataSet: DataSet[(K, T)],
   edgeJoinFunction: EdgeJoinFunction[EV, T]): Graph[K, VV, EV] = {
     val javaTupleSet = inputDataSet.map(scalatuple => new 
jtuple.Tuple2(scalatuple._1,
@@ -634,7 +631,7 @@ TypeInformation : ClassTag](jgraph: jg.Graph[K, VV, EV]) {
    * @tparam T the type of the second field of the input Tuple2 DataSet.
    * @return a new Graph, where the edge values have been updated according to 
the
    * result of the edgeJoinFunction.
-  */
+   */
   def joinWithEdgesOnSource[T: TypeInformation](inputDataSet: DataSet[(K, T)], 
fun: (EV, T) =>
     EV): Graph[K, VV, EV] = {
     val newEdgeJoin = new EdgeJoinFunction[EV, T]() {
@@ -664,7 +661,7 @@ TypeInformation : ClassTag](jgraph: jg.Graph[K, VV, EV]) {
    * @tparam T the type of the second field of the input Tuple2 DataSet.
    * @return a new Graph, where the edge values have been updated according to 
the
    * result of the edgeJoinFunction.
-  */
+   */
   def joinWithEdgesOnTarget[T: TypeInformation](inputDataSet: DataSet[(K, T)],
   edgeJoinFunction: EdgeJoinFunction[EV, T]): Graph[K, VV, EV] = {
     val javaTupleSet = inputDataSet.map(scalatuple => new 
jtuple.Tuple2(scalatuple._1,
@@ -687,7 +684,7 @@ TypeInformation : ClassTag](jgraph: jg.Graph[K, VV, EV]) {
    * @tparam T the type of the second field of the input Tuple2 DataSet.
    * @return a new Graph, where the edge values have been updated according to 
the
    * result of the edgeJoinFunction.
-  */
+   */
   def joinWithEdgesOnTarget[T: TypeInformation](inputDataSet: DataSet[(K, T)], 
fun: (EV, T) =>
     EV): Graph[K, VV, EV] = {
     val newEdgeJoin = new EdgeJoinFunction[EV, T]() {
@@ -945,30 +942,30 @@ TypeInformation : ClassTag](jgraph: jg.Graph[K, VV, EV]) {
   }
 
   /**
-  * Adds the list of vertices, passed as input, to the graph.
-  * If the vertices already exist in the graph, they will not be added once 
more.
-  *
-  * @param vertices the list of vertices to add
-  * @return the new graph containing the existing and newly added vertices
-  */
+   * Adds the list of vertices, passed as input, to the graph.
+   * If the vertices already exist in the graph, they will not be added once 
more.
+   *
+   * @param vertices the list of vertices to add
+   * @return the new graph containing the existing and newly added vertices
+   */
   def addVertices(vertices: List[Vertex[K, VV]]): Graph[K, VV, EV] = {
     wrapGraph(jgraph.addVertices(vertices.asJava))
   }
 
   /**
-  * Adds the given list edges to the graph.
-  *
-  * When adding an edge for a non-existing set of vertices,
-  * the edge is considered invalid and ignored.
-  *
-  * @param edges the data set of edges to be added
-  * @return a new graph containing the existing edges plus the newly added 
edges.
-  */
+   * Adds the given list edges to the graph.
+   *
+   * When adding an edge for a non-existing set of vertices,
+   * the edge is considered invalid and ignored.
+   *
+   * @param edges the data set of edges to be added
+   * @return a new graph containing the existing edges plus the newly added 
edges.
+   */
   def addEdges(edges: List[Edge[K, EV]]): Graph[K, VV, EV] = {
     wrapGraph(jgraph.addEdges(edges.asJava))
   }
 
-    /**
+  /**
    * Adds the given edge to the graph. If the source and target vertices do
    * not exist in the graph, they will also be added.
    *
@@ -993,7 +990,7 @@ TypeInformation : ClassTag](jgraph: jg.Graph[K, VV, EV]) {
     wrapGraph(jgraph.removeVertex(vertex))
   }
 
-    /**
+  /**
    * Removes the given vertex and its edges from the graph.
    *
    * @param vertices list of vertices to remove
@@ -1037,12 +1034,13 @@ TypeInformation : ClassTag](jgraph: jg.Graph[K, VV, 
EV]) {
   }
 
   /**
-  * Performs Difference on the vertex and edge sets of the input graphs
-  * removes common vertices and edges. If a source/target vertex is removed,
-  * its corresponding edge will also be removed
-  * @param graph the graph to perform difference with
-  * @return a new graph where the common vertices and edges have been removed
-  */
+   * Performs Difference on the vertex and edge sets of the input graphs
+   * removes common vertices and edges. If a source/target vertex is removed,
+   * its corresponding edge will also be removed
+   *
+   * @param graph the graph to perform difference with
+   * @return a new graph where the common vertices and edges have been removed
+   */
   def difference(graph: Graph[K, VV, EV]) = {
     wrapGraph(jgraph.difference(graph.getWrappedGraph))
   }
@@ -1135,36 +1133,34 @@ TypeInformation : ClassTag](jgraph: jg.Graph[K, VV, 
EV]) {
    * Runs a scatter-gather iteration on the graph.
    * No configuration options are provided.
    *
-   * @param vertexUpdateFunction the vertex update function
-   * @param messagingFunction the messaging function
+   * @param scatterFunction the scatter function
+   * @param gatherFunction the gather function
    * @param maxIterations maximum number of iterations to perform
-   *
    * @return the updated Graph after the scatter-gather iteration has 
converged or
    *         after maximumNumberOfIterations.
    */
-  def runScatterGatherIteration[M](vertexUpdateFunction: 
VertexUpdateFunction[K, VV, M],
-                                   messagingFunction: MessagingFunction[K, VV, 
M, EV],
+  def runScatterGatherIteration[M](scatterFunction: ScatterFunction[K, VV, M, 
EV],
+                                   gatherFunction: SpargelGatherFunction[K, 
VV, M],
                                    maxIterations: Int): Graph[K, VV, EV] = {
-    wrapGraph(jgraph.runScatterGatherIteration(vertexUpdateFunction, 
messagingFunction,
+    wrapGraph(jgraph.runScatterGatherIteration(scatterFunction, gatherFunction,
       maxIterations))
   }
 
   /**
    * Runs a scatter-gather iteration on the graph with configuration options.
    *
-   * @param vertexUpdateFunction the vertex update function
-   * @param messagingFunction the messaging function
+   * @param scatterFunction the scatter function
+   * @param gatherFunction the gather function
    * @param maxIterations maximum number of iterations to perform
    * @param parameters the iteration configuration parameters
-   *
    * @return the updated Graph after the scatter-gather iteration has 
converged or
    *         after maximumNumberOfIterations.
    */
-  def runScatterGatherIteration[M](vertexUpdateFunction: 
VertexUpdateFunction[K, VV, M],
-                                   messagingFunction: MessagingFunction[K, VV, 
M, EV],
+  def runScatterGatherIteration[M](scatterFunction: ScatterFunction[K, VV, M, 
EV],
+                                   gatherFunction: SpargelGatherFunction[K, 
VV, M],
                                    maxIterations: Int, parameters: 
ScatterGatherConfiguration):
   Graph[K, VV, EV] = {
-    wrapGraph(jgraph.runScatterGatherIteration(vertexUpdateFunction, 
messagingFunction,
+    wrapGraph(jgraph.runScatterGatherIteration(scatterFunction, gatherFunction,
       maxIterations, parameters))
   }
 
@@ -1178,11 +1174,10 @@ TypeInformation : ClassTag](jgraph: jg.Graph[K, VV, 
EV]) {
    * @param applyFunction the apply function updates the vertex values with 
the aggregates
    * @param maxIterations maximum number of iterations to perform
    * @tparam M the intermediate type used between gather, sum and apply
-   *
    * @return the updated Graph after the gather-sum-apply iteration has 
converged or
    *         after maximumNumberOfIterations.
    */
-  def runGatherSumApplyIteration[M](gatherFunction: GatherFunction[VV, EV, M], 
sumFunction:
+  def runGatherSumApplyIteration[M](gatherFunction: GSAGatherFunction[VV, EV, 
M], sumFunction:
   SumFunction[VV, EV, M], applyFunction: ApplyFunction[K, VV, M], 
maxIterations: Int): Graph[K,
     VV, EV] = {
     wrapGraph(jgraph.runGatherSumApplyIteration(gatherFunction, sumFunction, 
applyFunction,
@@ -1199,25 +1194,23 @@ TypeInformation : ClassTag](jgraph: jg.Graph[K, VV, 
EV]) {
    * @param maxIterations maximum number of iterations to perform
    * @param parameters the iteration configuration parameters
    * @tparam M the intermediate type used between gather, sum and apply
-   *
    * @return the updated Graph after the gather-sum-apply iteration has 
converged or
    *         after maximumNumberOfIterations.
    */
-  def runGatherSumApplyIteration[M](gatherFunction: GatherFunction[VV, EV, M], 
sumFunction:
+  def runGatherSumApplyIteration[M](gatherFunction: GSAGatherFunction[VV, EV, 
M], sumFunction:
   SumFunction[VV, EV, M], applyFunction: ApplyFunction[K, VV, M], 
maxIterations: Int,
                                     parameters: GSAConfiguration): Graph[K, 
VV, EV] = {
     wrapGraph(jgraph.runGatherSumApplyIteration(gatherFunction, sumFunction, 
applyFunction,
       maxIterations, parameters))
   }
 
-   /**
+  /**
    * Runs a vertex-centric iteration on the graph.
    * No configuration options are provided.
    *
    * @param computeFunction the compute function
    * @param combineFunction the optional message combiner function
    * @param maxIterations maximum number of iterations to perform
-   *
    * @return the updated Graph after the vertex-centric iteration has 
converged or
    *         after maximumNumberOfIterations.
    */
@@ -1235,7 +1228,6 @@ TypeInformation : ClassTag](jgraph: jg.Graph[K, VV, EV]) {
    * @param combineFunction the optional message combiner function
    * @param maxIterations maximum number of iterations to perform
    * @param parameters the iteration configuration parameters
-   *
    * @return the updated Graph after the vertex-centric iteration has 
converged or
    *         after maximumNumberOfIterations.
    */

http://git-wip-us.apache.org/repos/asf/flink/blob/918e5d0c/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
index fe59283..821b0a7 100755
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
@@ -45,17 +45,15 @@ import 
org.apache.flink.graph.asm.translate.TranslateGraphIds;
 import org.apache.flink.graph.asm.translate.TranslateVertexValues;
 import org.apache.flink.graph.gsa.ApplyFunction;
 import org.apache.flink.graph.gsa.GSAConfiguration;
-import org.apache.flink.graph.gsa.GatherFunction;
 import org.apache.flink.graph.gsa.GatherSumApplyIteration;
 import org.apache.flink.graph.gsa.SumFunction;
 import org.apache.flink.graph.pregel.ComputeFunction;
 import org.apache.flink.graph.pregel.MessageCombiner;
 import org.apache.flink.graph.pregel.VertexCentricConfiguration;
 import org.apache.flink.graph.pregel.VertexCentricIteration;
-import org.apache.flink.graph.spargel.MessagingFunction;
+import org.apache.flink.graph.spargel.ScatterFunction;
 import org.apache.flink.graph.spargel.ScatterGatherConfiguration;
 import org.apache.flink.graph.spargel.ScatterGatherIteration;
-import org.apache.flink.graph.spargel.VertexUpdateFunction;
 import org.apache.flink.graph.utils.EdgeToTuple3Map;
 import org.apache.flink.graph.utils.Tuple2ToVertexMap;
 import org.apache.flink.graph.utils.Tuple3ToEdgeMap;
@@ -1652,27 +1650,27 @@ public class Graph<K, VV, EV> {
         * Runs a ScatterGather iteration on the graph.
         * No configuration options are provided.
         *
-        * @param vertexUpdateFunction the vertex update function
-        * @param messagingFunction the messaging function
+        * @param scatterFunction the scatter function
+        * @param gatherFunction the gather function
         * @param maximumNumberOfIterations maximum number of iterations to 
perform
         * 
         * @return the updated Graph after the scatter-gather iteration has 
converged or
         * after maximumNumberOfIterations.
         */
        public <M> Graph<K, VV, EV> runScatterGatherIteration(
-                       VertexUpdateFunction<K, VV, M> vertexUpdateFunction,
-                       MessagingFunction<K, VV, M, EV> messagingFunction,
+                       ScatterFunction<K, VV, M, EV> scatterFunction,
+                       org.apache.flink.graph.spargel.GatherFunction<K, VV, M> 
gatherFunction,
                        int maximumNumberOfIterations) {
 
-               return this.runScatterGatherIteration(vertexUpdateFunction, 
messagingFunction,
+               return this.runScatterGatherIteration(scatterFunction, 
gatherFunction,
                                maximumNumberOfIterations, null);
        }
 
        /**
         * Runs a ScatterGather iteration on the graph with configuration 
options.
-        * 
-        * @param vertexUpdateFunction the vertex update function
-        * @param messagingFunction the messaging function
+        *
+        * @param scatterFunction the scatter function
+        * @param gatherFunction the gather function
         * @param maximumNumberOfIterations maximum number of iterations to 
perform
         * @param parameters the iteration configuration parameters
         * 
@@ -1680,12 +1678,12 @@ public class Graph<K, VV, EV> {
         * after maximumNumberOfIterations.
         */
        public <M> Graph<K, VV, EV> runScatterGatherIteration(
-                       VertexUpdateFunction<K, VV, M> vertexUpdateFunction,
-                       MessagingFunction<K, VV, M, EV> messagingFunction,
+                       ScatterFunction<K, VV, M, EV> scatterFunction,
+                       org.apache.flink.graph.spargel.GatherFunction<K, VV, M> 
gatherFunction,
                        int maximumNumberOfIterations, 
ScatterGatherConfiguration parameters) {
 
                ScatterGatherIteration<K, VV, M, EV> iteration = 
ScatterGatherIteration.withEdges(
-                               edges, vertexUpdateFunction, messagingFunction, 
maximumNumberOfIterations);
+                               edges, scatterFunction, gatherFunction, 
maximumNumberOfIterations);
 
                iteration.configure(parameters);
 
@@ -1708,7 +1706,7 @@ public class Graph<K, VV, EV> {
         * after maximumNumberOfIterations.
         */
        public <M> Graph<K, VV, EV> runGatherSumApplyIteration(
-                       GatherFunction<VV, EV, M> gatherFunction, 
SumFunction<VV, EV, M> sumFunction,
+                       org.apache.flink.graph.gsa.GatherFunction 
gatherFunction, SumFunction<VV, EV, M> sumFunction,
                        ApplyFunction<K, VV, M> applyFunction, int 
maximumNumberOfIterations) {
 
                return this.runGatherSumApplyIteration(gatherFunction, 
sumFunction, applyFunction,
@@ -1729,7 +1727,7 @@ public class Graph<K, VV, EV> {
         * after maximumNumberOfIterations.
         */
        public <M> Graph<K, VV, EV> runGatherSumApplyIteration(
-                       GatherFunction<VV, EV, M> gatherFunction, 
SumFunction<VV, EV, M> sumFunction,
+                       org.apache.flink.graph.gsa.GatherFunction 
gatherFunction, SumFunction<VV, EV, M> sumFunction,
                        ApplyFunction<K, VV, M> applyFunction, int 
maximumNumberOfIterations,
                        GSAConfiguration parameters) {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/918e5d0c/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/IterationConfiguration.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/IterationConfiguration.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/IterationConfiguration.java
index 7578420..0b98a27 100644
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/IterationConfiguration.java
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/IterationConfiguration.java
@@ -22,6 +22,7 @@ import java.util.HashMap;
 import java.util.Map;
 
 import org.apache.flink.api.common.aggregators.Aggregator;
+import org.apache.flink.graph.spargel.GatherFunction;
 import org.apache.flink.util.Preconditions;
 
 /**
@@ -133,8 +134,8 @@ public abstract class IterationConfiguration {
 
        /**
         * Registers a new aggregator. Aggregators registered here are 
available during the execution of the vertex updates
-        * via {@link 
org.apache.flink.graph.spargel.VertexUpdateFunction#getIterationAggregator(String)}
 and
-        * {@link 
org.apache.flink.graph.spargel.VertexUpdateFunction#getPreviousIterationAggregate(String)}.
+        * via {@link GatherFunction#getIterationAggregator(String)} and
+        * {@link GatherFunction#getPreviousIterationAggregate(String)}.
         * 
         * @param name The name of the aggregator, used to retrieve it and its 
aggregates during execution. 
         * @param aggregator The aggregator.

http://git-wip-us.apache.org/repos/asf/flink/blob/918e5d0c/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/CommunityDetection.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/CommunityDetection.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/CommunityDetection.java
index 4d68661..f554680 100644
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/CommunityDetection.java
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/CommunityDetection.java
@@ -26,9 +26,9 @@ import org.apache.flink.graph.Edge;
 import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.GraphAlgorithm;
 import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.spargel.GatherFunction;
 import org.apache.flink.graph.spargel.MessageIterator;
-import org.apache.flink.graph.spargel.MessagingFunction;
-import org.apache.flink.graph.spargel.VertexUpdateFunction;
+import org.apache.flink.graph.spargel.ScatterFunction;
 
 import java.util.Map;
 import java.util.TreeMap;
@@ -73,19 +73,33 @@ public class CommunityDetection<K> implements 
GraphAlgorithm<K, Long, Double, Gr
        public Graph<K, Long, Double> run(Graph<K, Long, Double> graph) {
 
                DataSet<Vertex<K, Tuple2<Long, Double>>> initializedVertices = 
graph.getVertices()
-                               .map(new AddScoreToVertexValuesMapper<K>());
+                       .map(new AddScoreToVertexValuesMapper<K>());
 
                Graph<K, Tuple2<Long, Double>, Double> graphWithScoredVertices =
-                               Graph.fromDataSet(initializedVertices, 
graph.getEdges(), graph.getContext()).getUndirected();
+                       Graph.fromDataSet(initializedVertices, 
graph.getEdges(), graph.getContext()).getUndirected();
 
-               return graphWithScoredVertices.runScatterGatherIteration(new 
VertexLabelUpdater<K>(delta),
-                               new LabelMessenger<K>(), maxIterations)
+               return graphWithScoredVertices.runScatterGatherIteration(new 
LabelMessenger<K>(),
+                       new VertexLabelUpdater<K>(delta), maxIterations)
                                .mapVertices(new 
RemoveScoreFromVertexValuesMapper<K>());
        }
 
        @SuppressWarnings("serial")
-       public static final class VertexLabelUpdater<K> extends 
VertexUpdateFunction<
-               K, Tuple2<Long, Double>, Tuple2<Long, Double>> {
+       public static final class LabelMessenger<K> extends ScatterFunction<K, 
Tuple2<Long, Double>,
+                       Tuple2<Long, Double>, Double> {
+
+               @Override
+               public void sendMessages(Vertex<K, Tuple2<Long, Double>> 
vertex) throws Exception {
+
+                       for(Edge<K, Double> edge : getEdges()) {
+                               sendMessageTo(edge.getTarget(), new 
Tuple2<Long, Double>(vertex.getValue().f0,
+                                       vertex.getValue().f1 * 
edge.getValue()));
+                       }
+               }
+       }
+
+       @SuppressWarnings("serial")
+       public static final class VertexLabelUpdater<K> extends GatherFunction<
+                       K, Tuple2<Long, Double>, Tuple2<Long, Double>> {
 
                private Double delta;
 
@@ -154,27 +168,13 @@ public class CommunityDetection<K> implements 
GraphAlgorithm<K, Long, Double, Gr
        }
 
        @SuppressWarnings("serial")
-       public static final class LabelMessenger<K> extends 
MessagingFunction<K, Tuple2<Long, Double>,
-                       Tuple2<Long, Double>, Double> {
-
-               @Override
-               public void sendMessages(Vertex<K, Tuple2<Long, Double>> 
vertex) throws Exception {
-
-                       for(Edge<K, Double> edge : getEdges()) {
-                               sendMessageTo(edge.getTarget(), new 
Tuple2<Long, Double>(vertex.getValue().f0,
-                                               vertex.getValue().f1 * 
edge.getValue()));
-                       }
-               }
-       }
-
-       @SuppressWarnings("serial")
        @ForwardedFields("f0")
        public static final class AddScoreToVertexValuesMapper<K> implements 
MapFunction<
                Vertex<K, Long>, Vertex<K, Tuple2<Long, Double>>> {
 
                public Vertex<K, Tuple2<Long, Double>> map(Vertex<K, Long> 
vertex) {
                        return new Vertex<K, Tuple2<Long, Double>>(
-                                       vertex.getId(), new Tuple2<Long, 
Double>(vertex.getValue(), 1.0));
+                               vertex.getId(), new Tuple2<Long, 
Double>(vertex.getValue(), 1.0));
                }
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/918e5d0c/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/ConnectedComponents.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/ConnectedComponents.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/ConnectedComponents.java
index efc32c1..3cd8f05 100644
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/ConnectedComponents.java
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/ConnectedComponents.java
@@ -25,9 +25,9 @@ import org.apache.flink.api.java.typeutils.TupleTypeInfo;
 import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.GraphAlgorithm;
 import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.spargel.GatherFunction;
 import org.apache.flink.graph.spargel.MessageIterator;
-import org.apache.flink.graph.spargel.MessagingFunction;
-import org.apache.flink.graph.spargel.VertexUpdateFunction;
+import org.apache.flink.graph.spargel.ScatterFunction;
 import org.apache.flink.graph.utils.NullValueEdgeMapper;
 import org.apache.flink.types.NullValue;
 
@@ -76,39 +76,16 @@ public class ConnectedComponents<K, VV extends 
Comparable<VV>, EV>
                        .getUndirected();
 
                return undirectedGraph.runScatterGatherIteration(
-                       new CCUpdater<K, VV>(),
                        new CCMessenger<K, VV>(valueTypeInfo),
+                       new CCUpdater<K, VV>(),
                        maxIterations).getVertices();
        }
 
        /**
-        * Updates the value of a vertex by picking the minimum neighbor value 
out of all the incoming messages.
-        */
-       public static final class CCUpdater<K, VV extends Comparable<VV>>
-               extends VertexUpdateFunction<K, VV, VV> {
-
-               @Override
-               public void updateVertex(Vertex<K, VV> vertex, 
MessageIterator<VV> messages) throws Exception {
-                       VV current = vertex.getValue();
-                       VV min = current;
-
-                       for (VV msg : messages) {
-                               if (msg.compareTo(min) < 0) {
-                                       min = msg;
-                               }
-                       }
-
-                       if (!min.equals(current)) {
-                               setNewVertexValue(min);
-                       }
-               }
-       }
-
-       /**
         * Sends the current vertex value to all adjacent vertices.
         */
        public static final class CCMessenger<K, VV extends Comparable<VV>>
-               extends MessagingFunction<K, VV, VV, NullValue>
+               extends ScatterFunction<K, VV, VV, NullValue>
                implements ResultTypeQueryable<VV> {
 
                private final TypeInformation<VV> typeInformation;
@@ -128,4 +105,27 @@ public class ConnectedComponents<K, VV extends 
Comparable<VV>, EV>
                        return typeInformation;
                }
        }
+
+       /**
+        * Updates the value of a vertex by picking the minimum neighbor value 
out of all the incoming messages.
+        */
+       public static final class CCUpdater<K, VV extends Comparable<VV>>
+               extends GatherFunction<K, VV, VV> {
+
+               @Override
+               public void updateVertex(Vertex<K, VV> vertex, 
MessageIterator<VV> messages) throws Exception {
+                       VV current = vertex.getValue();
+                       VV min = current;
+
+                       for (VV msg : messages) {
+                               if (msg.compareTo(min) < 0) {
+                                       min = msg;
+                               }
+                       }
+
+                       if (!min.equals(current)) {
+                               setNewVertexValue(min);
+                       }
+               }
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/918e5d0c/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/GSAConnectedComponents.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/GSAConnectedComponents.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/GSAConnectedComponents.java
index a12cb20..327de73 100755
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/GSAConnectedComponents.java
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/GSAConnectedComponents.java
@@ -27,8 +27,8 @@ import org.apache.flink.graph.GraphAlgorithm;
 import org.apache.flink.graph.Vertex;
 import org.apache.flink.graph.gsa.ApplyFunction;
 import org.apache.flink.graph.gsa.GatherFunction;
-import org.apache.flink.graph.gsa.SumFunction;
 import org.apache.flink.graph.gsa.Neighbor;
+import org.apache.flink.graph.gsa.SumFunction;
 import org.apache.flink.graph.utils.NullValueEdgeMapper;
 import org.apache.flink.types.NullValue;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/918e5d0c/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/GSASingleSourceShortestPaths.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/GSASingleSourceShortestPaths.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/GSASingleSourceShortestPaths.java
index 5899fa0..f39d858 100755
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/GSASingleSourceShortestPaths.java
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/GSASingleSourceShortestPaths.java
@@ -25,8 +25,8 @@ import org.apache.flink.graph.GraphAlgorithm;
 import org.apache.flink.graph.Vertex;
 import org.apache.flink.graph.gsa.ApplyFunction;
 import org.apache.flink.graph.gsa.GatherFunction;
-import org.apache.flink.graph.gsa.SumFunction;
 import org.apache.flink.graph.gsa.Neighbor;
+import org.apache.flink.graph.gsa.SumFunction;
 
 /**
  * This is an implementation of the Single Source Shortest Paths algorithm, 
using a gather-sum-apply iteration

http://git-wip-us.apache.org/repos/asf/flink/blob/918e5d0c/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/LabelPropagation.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/LabelPropagation.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/LabelPropagation.java
index fef6808..2d13dfd 100644
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/LabelPropagation.java
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/LabelPropagation.java
@@ -25,9 +25,9 @@ import org.apache.flink.api.java.typeutils.TupleTypeInfo;
 import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.GraphAlgorithm;
 import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.spargel.GatherFunction;
 import org.apache.flink.graph.spargel.MessageIterator;
-import org.apache.flink.graph.spargel.MessagingFunction;
-import org.apache.flink.graph.spargel.VertexUpdateFunction;
+import org.apache.flink.graph.spargel.ScatterFunction;
 import org.apache.flink.graph.utils.NullValueEdgeMapper;
 import org.apache.flink.types.NullValue;
 
@@ -78,15 +78,38 @@ public class LabelPropagation<K, VV extends Comparable<VV>, 
EV>
                return input
                        .mapEdges(new NullValueEdgeMapper<K, EV>())
                        .runScatterGatherIteration(
-                               new UpdateVertexLabel<K, VV>(), new 
SendNewLabelToNeighbors<K, VV>(valueType), maxIterations)
+                               new SendNewLabelToNeighbors<K, VV>(valueType), 
new UpdateVertexLabel<K, VV>(), maxIterations)
                        .getVertices();
        }
 
        /**
+        * Sends the vertex label to all out-neighbors
+        */
+       public static final class SendNewLabelToNeighbors<K, VV extends 
Comparable<VV>>
+               extends ScatterFunction<K, VV, VV, NullValue>
+               implements ResultTypeQueryable<VV> {
+
+               private final TypeInformation<VV> typeInformation;
+
+               public SendNewLabelToNeighbors(TypeInformation<VV> 
typeInformation) {
+                       this.typeInformation = typeInformation;
+               }
+
+               public void sendMessages(Vertex<K, VV> vertex) {
+                       sendMessageToAllNeighbors(vertex.getValue());
+               }
+
+               @Override
+               public TypeInformation<VV> getProducedType() {
+                       return typeInformation;
+               }
+       }
+
+       /**
         * Function that updates the value of a vertex by adopting the most 
frequent
         * label among its in-neighbors
         */
-       public static final class UpdateVertexLabel<K, VV extends 
Comparable<VV>> extends VertexUpdateFunction<K, VV, VV> {
+       public static final class UpdateVertexLabel<K, VV extends 
Comparable<VV>> extends GatherFunction<K, VV, VV> {
 
                public void updateVertex(Vertex<K, VV> vertex, 
MessageIterator<VV> inMessages) {
                        Map<VV, Long> labelsWithFrequencies = new HashMap<VV, 
Long>();
@@ -119,27 +142,4 @@ public class LabelPropagation<K, VV extends 
Comparable<VV>, EV>
                        setNewVertexValue(mostFrequentLabel);
                }
        }
-
-       /**
-        * Sends the vertex label to all out-neighbors
-        */
-       public static final class SendNewLabelToNeighbors<K, VV extends 
Comparable<VV>>
-               extends MessagingFunction<K, VV, VV, NullValue>
-               implements ResultTypeQueryable<VV> {
-
-               private final TypeInformation<VV> typeInformation;
-
-               public SendNewLabelToNeighbors(TypeInformation<VV> 
typeInformation) {
-                       this.typeInformation = typeInformation;
-               }
-
-               public void sendMessages(Vertex<K, VV> vertex) {
-                       sendMessageToAllNeighbors(vertex.getValue());
-               }
-
-               @Override
-               public TypeInformation<VV> getProducedType() {
-                       return typeInformation;
-               }
-       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/918e5d0c/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/PageRank.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/PageRank.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/PageRank.java
index 2f1b03b..bf9b4e9 100644
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/PageRank.java
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/PageRank.java
@@ -25,10 +25,10 @@ import org.apache.flink.graph.EdgeJoinFunction;
 import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.GraphAlgorithm;
 import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.spargel.GatherFunction;
 import org.apache.flink.graph.spargel.MessageIterator;
-import org.apache.flink.graph.spargel.MessagingFunction;
+import org.apache.flink.graph.spargel.ScatterFunction;
 import org.apache.flink.graph.spargel.ScatterGatherConfiguration;
-import org.apache.flink.graph.spargel.VertexUpdateFunction;
 import org.apache.flink.types.LongValue;
 
 /**
@@ -65,18 +65,37 @@ public class PageRank<K> implements GraphAlgorithm<K, 
Double, Double, DataSet<Ve
                ScatterGatherConfiguration parameters = new 
ScatterGatherConfiguration();
                parameters.setOptNumVertices(true);
 
-               return networkWithWeights.runScatterGatherIteration(new 
VertexRankUpdater<K>(beta),
-                               new RankMessenger<K>(), maxIterations, 
parameters)
+               return networkWithWeights.runScatterGatherIteration(new 
RankMessenger<K>(),
+                               new VertexRankUpdater<K>(beta), maxIterations, 
parameters)
                                .getVertices();
        }
 
        /**
+        * Distributes the rank of a vertex among all target vertices according 
to
+        * the transition probability, which is associated with an edge as the 
edge
+        * value.
+        */
+       @SuppressWarnings("serial")
+       public static final class RankMessenger<K> extends ScatterFunction<K, 
Double, Double, Double> {
+               @Override
+               public void sendMessages(Vertex<K, Double> vertex) {
+                       if (getSuperstepNumber() == 1) {
+                               // initialize vertex ranks
+                               vertex.setValue(1.0 / 
this.getNumberOfVertices());
+                       }
+
+                       for (Edge<K, Double> edge : getEdges()) {
+                               sendMessageTo(edge.getTarget(), 
vertex.getValue() * edge.getValue());
+                       }
+               }
+       }
+
+       /**
         * Function that updates the rank of a vertex by summing up the partial
         * ranks from all incoming messages and then applying the dampening 
formula.
         */
        @SuppressWarnings("serial")
-       public static final class VertexRankUpdater<K> extends 
VertexUpdateFunction<K, Double, Double> {
-
+       public static final class VertexRankUpdater<K> extends 
GatherFunction<K, Double, Double> {
                private final double beta;
 
                public VertexRankUpdater(double beta) {
@@ -96,30 +115,8 @@ public class PageRank<K> implements GraphAlgorithm<K, 
Double, Double, DataSet<Ve
                }
        }
 
-       /**
-        * Distributes the rank of a vertex among all target vertices according 
to
-        * the transition probability, which is associated with an edge as the 
edge
-        * value.
-        */
-       @SuppressWarnings("serial")
-       public static final class RankMessenger<K> extends MessagingFunction<K, 
Double, Double, Double> {
-
-               @Override
-               public void sendMessages(Vertex<K, Double> vertex) {
-                       if (getSuperstepNumber() == 1) {
-                               // initialize vertex ranks
-                               vertex.setValue(1.0 / 
this.getNumberOfVertices());
-                       }
-
-                       for (Edge<K, Double> edge : getEdges()) {
-                               sendMessageTo(edge.getTarget(), 
vertex.getValue() * edge.getValue());
-                       }
-               }
-       }
-
        @SuppressWarnings("serial")
        private static final class InitWeights implements 
EdgeJoinFunction<Double, LongValue> {
-
                public Double edgeJoin(Double edgeValue, LongValue inputValue) {
                        return edgeValue / (double) inputValue.getValue();
                }

Reply via email to