http://git-wip-us.apache.org/repos/asf/flink/blob/918e5d0c/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/SingleSourceShortestPaths.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/SingleSourceShortestPaths.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/SingleSourceShortestPaths.java
index 29183e9..4ff4e79 100644
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/SingleSourceShortestPaths.java
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/SingleSourceShortestPaths.java
@@ -24,9 +24,9 @@ import org.apache.flink.graph.Edge;
 import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.GraphAlgorithm;
 import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.spargel.GatherFunction;
 import org.apache.flink.graph.spargel.MessageIterator;
-import org.apache.flink.graph.spargel.MessagingFunction;
-import org.apache.flink.graph.spargel.VertexUpdateFunction;
+import org.apache.flink.graph.spargel.ScatterFunction;
 
 /**
  * This is an implementation of the Single-Source-Shortest Paths algorithm, 
using a scatter-gather iteration.
@@ -52,7 +52,7 @@ public class SingleSourceShortestPaths<K> implements 
GraphAlgorithm<K, Double, D
        public DataSet<Vertex<K, Double>> run(Graph<K, Double, Double> input) {
 
                return input.mapVertices(new InitVerticesMapper<K>(srcVertexId))
-                               .runScatterGatherIteration(new 
VertexDistanceUpdater<K>(), new MinDistanceMessenger<K>(),
+                               .runScatterGatherIteration(new 
MinDistanceMessenger<K>(), new VertexDistanceUpdater<K>(),
                                maxIterations).getVertices();
        }
 
@@ -74,12 +74,30 @@ public class SingleSourceShortestPaths<K> implements 
GraphAlgorithm<K, Double, D
        }
 
        /**
+        * Distributes the minimum distance associated with a given vertex 
among all
+        * the target vertices summed up with the edge's value.
+        *
+        * @param <K>
+        */
+       public static final class MinDistanceMessenger<K> extends 
ScatterFunction<K, Double, Double, Double> {
+
+               @Override
+               public void sendMessages(Vertex<K, Double> vertex) {
+                       if (vertex.getValue() < Double.POSITIVE_INFINITY) {
+                               for (Edge<K, Double> edge : getEdges()) {
+                                       sendMessageTo(edge.getTarget(), 
vertex.getValue() + edge.getValue());
+                               }
+                       }
+               }
+       }
+
+       /**
         * Function that updates the value of a vertex by picking the minimum
         * distance from all incoming messages.
         * 
         * @param <K>
         */
-       public static final class VertexDistanceUpdater<K> extends 
VertexUpdateFunction<K, Double, Double> {
+       public static final class VertexDistanceUpdater<K> extends 
GatherFunction<K, Double, Double> {
 
                @Override
                public void updateVertex(Vertex<K, Double> vertex,
@@ -98,22 +116,4 @@ public class SingleSourceShortestPaths<K> implements 
GraphAlgorithm<K, Double, D
                        }
                }
        }
-
-       /**
-        * Distributes the minimum distance associated with a given vertex 
among all
-        * the target vertices summed up with the edge's value.
-        * 
-        * @param <K>
-        */
-       public static final class MinDistanceMessenger<K> extends 
MessagingFunction<K, Double, Double, Double> {
-
-               @Override
-               public void sendMessages(Vertex<K, Double> vertex) {
-                       if (vertex.getValue() < Double.POSITIVE_INFINITY) {
-                               for (Edge<K, Double> edge : getEdges()) {
-                                       sendMessageTo(edge.getTarget(), 
vertex.getValue() + edge.getValue());
-                               }
-                       }
-               }
-       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/918e5d0c/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/TriangleEnumerator.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/TriangleEnumerator.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/TriangleEnumerator.java
index 8272d8f..681d060 100644
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/TriangleEnumerator.java
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/TriangleEnumerator.java
@@ -18,11 +18,11 @@
 
 package org.apache.flink.graph.library;
 
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.common.functions.FlatMapFunction;
-import org.apache.flink.api.common.functions.JoinFunction;
 import org.apache.flink.api.common.functions.GroupReduceFunction;
+import org.apache.flink.api.common.functions.JoinFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.common.operators.Order;
 import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint;
 import org.apache.flink.api.java.DataSet;

http://git-wip-us.apache.org/repos/asf/flink/blob/918e5d0c/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/GatherFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/GatherFunction.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/GatherFunction.java
new file mode 100644
index 0000000..d56c0da
--- /dev/null
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/GatherFunction.java
@@ -0,0 +1,251 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.spargel;
+
+import org.apache.flink.api.common.aggregators.Aggregator;
+import org.apache.flink.api.common.functions.IterationRuntimeContext;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.types.Value;
+import org.apache.flink.util.Collector;
+
+import java.io.Serializable;
+import java.util.Collection;
+
+/**
+ * This class must be extended by functions that compute the state of the 
vertex depending on the old state and the
+ * incoming messages. The central method is {@link #updateVertex(Vertex, 
MessageIterator)}, which is
+ * invoked once per vertex per superstep.
+ * 
+ * {@code <K>} The vertex key type.
+ * {@code <VV>} The vertex value type.
+ * {@code <Message>} The message type.
+ */
+public abstract class GatherFunction<K, VV, Message> implements Serializable {
+
+       private static final long serialVersionUID = 1L;
+
+       // 
--------------------------------------------------------------------------------------------
+       //  Attributes that allow vertices to access their in/out degrees and 
the total number of vertices
+       //  inside an iteration.
+       // 
--------------------------------------------------------------------------------------------
+
+       private long numberOfVertices = -1L;
+
+       /**
+        * Retrieves the number of vertices in the graph.
+        * @return the number of vertices if the {@link 
org.apache.flink.graph.IterationConfiguration#setOptNumVertices(boolean)}
+        * option has been set; -1 otherwise.
+        */
+       public long getNumberOfVertices() {
+               return numberOfVertices;
+       }
+
+       void setNumberOfVertices(long numberOfVertices) {
+               this.numberOfVertices = numberOfVertices;
+       }
+
+       
//---------------------------------------------------------------------------------------------
+
+       private boolean optDegrees;
+
+       boolean isOptDegrees() {
+               return optDegrees;
+       }
+
+       void setOptDegrees(boolean optDegrees) {
+               this.optDegrees = optDegrees;
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+       //  Public API Methods
+       // 
--------------------------------------------------------------------------------------------
+
+       /**
+        * This method is invoked once per vertex per superstep. It receives 
the current state of the vertex, as well as
+        * the incoming messages. It may set a new vertex state via {@link 
#setNewVertexValue(Object)}. If the vertex
+        * state is changed, it will trigger the sending of messages via the 
{@link ScatterFunction}.
+        * 
+        * @param vertex The vertex.
+        * @param inMessages The incoming messages to this vertex.
+        * 
+        * @throws Exception The computation may throw exceptions, which causes 
the superstep to fail.
+        */
+       public abstract void updateVertex(Vertex<K, VV> vertex, 
MessageIterator<Message> inMessages) throws Exception;
+
+       /**
+        * This method is executed once per superstep before the gather 
function is invoked for each vertex.
+        * 
+        * @throws Exception Exceptions in the pre-superstep phase cause the 
superstep to fail.
+        */
+       public void preSuperstep() throws Exception {}
+
+       /**
+        * This method is executed once per superstep after the gather function 
has been invoked for each vertex.
+        * 
+        * @throws Exception Exceptions in the post-superstep phase cause the 
superstep to fail.
+        */
+       public void postSuperstep() throws Exception {}
+
+       /**
+        * Sets the new value of this vertex. Setting a new value triggers the 
sending of outgoing messages from this vertex.
+        *
+        * This should be called at most once per updateVertex.
+        * 
+        * @param newValue The new vertex value.
+        */
+       public void setNewVertexValue(VV newValue) {
+               if(setNewVertexValueCalled) {
+                       throw new IllegalStateException("setNewVertexValue 
should only be called at most once per updateVertex");
+               }
+               setNewVertexValueCalled = true;
+               if(isOptDegrees()) {
+                       outValWithDegrees.f1.f0 = newValue;
+                       outWithDegrees.collect(outValWithDegrees);
+               } else {
+                       outVal.setValue(newValue);
+                       out.collect(outVal);
+               }
+       }
+
+       /**
+        * Gets the number of the superstep, starting at <tt>1</tt>.
+        * 
+        * @return The number of the current superstep.
+        */
+       public int getSuperstepNumber() {
+               return this.runtimeContext.getSuperstepNumber();
+       }
+
+       /**
+        * Gets the iteration aggregator registered under the given name. The 
iteration aggregator combines
+        * all aggregates globally once per superstep and makes them available 
in the next superstep.
+        * 
+        * @param name The name of the aggregator.
+        * @return The aggregator registered under this name, or null, if no 
aggregator was registered.
+        */
+       public <T extends Aggregator<?>> T getIterationAggregator(String name) {
+               return this.runtimeContext.<T>getIterationAggregator(name);
+       }
+
+       /**
+        * Get the aggregated value that an aggregator computed in the previous 
iteration.
+        * 
+        * @param name The name of the aggregator.
+        * @return The aggregated value of the previous iteration.
+        */
+       public <T extends Value> T getPreviousIterationAggregate(String name) {
+               return 
this.runtimeContext.<T>getPreviousIterationAggregate(name);
+       }
+
+       /**
+        * Gets the broadcast data set registered under the given name. 
Broadcast data sets
+        * are available on all parallel instances of a function. They can be 
registered via
+        * {@link 
org.apache.flink.graph.spargel.ScatterGatherConfiguration#addBroadcastSetForGatherFunction(String,
 org.apache.flink.api.java.DataSet)}.
+        * 
+        * @param name The name under which the broadcast set is registered.
+        * @return The broadcast data set.
+        */
+       public <T> Collection<T> getBroadcastSet(String name) {
+               return this.runtimeContext.<T>getBroadcastVariable(name);
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+       //  internal methods
+       // 
--------------------------------------------------------------------------------------------
+
+       private IterationRuntimeContext runtimeContext;
+
+       private Collector<Vertex<K, VV>> out;
+
+       private Collector<Vertex<K, Tuple3<VV, Long, Long>>> outWithDegrees;
+
+       private Vertex<K, VV> outVal;
+
+       private Vertex<K, Tuple3<VV, Long, Long>> outValWithDegrees;
+
+       private long inDegree = -1;
+
+       private long outDegree = -1;
+
+       private boolean setNewVertexValueCalled;
+
+       void init(IterationRuntimeContext context) {
+               this.runtimeContext = context;
+       }
+
+       void setOutput(Vertex<K, VV> outVal, Collector<Vertex<K, VV>> out) {
+               this.outVal = outVal;
+               this.out = out;
+               setNewVertexValueCalled = false;
+       }
+
+       @SuppressWarnings({ "unchecked", "rawtypes" })
+       <ValueWithDegree> void setOutputWithDegrees(Vertex<K, ValueWithDegree> 
outVal,
+                       Collector out) {
+               this.outValWithDegrees = (Vertex<K, Tuple3<VV, Long, Long>>) 
outVal;
+               this.outWithDegrees = out;
+               setNewVertexValueCalled = false;
+       }
+
+       /**
+        * Retrieves the vertex in-degree (number of in-coming edges).
+        * @return The in-degree of this vertex
+        */
+       public long getInDegree() {
+               return inDegree;
+       }
+
+       void setInDegree(long inDegree) {
+               this.inDegree = inDegree;
+       }
+
+       /**
+        * Retrieve the vertex out-degree (number of out-going edges).
+        * @return The out-degree of this vertex
+        */
+       public long getOutDegree() {
+               return outDegree;
+       }
+
+       void setOutDegree(long outDegree) {
+               this.outDegree = outDegree;
+       }
+
+       /**
+        * In order to hide the Tuple3(actualValue, inDegree, OutDegree) vertex 
value from the user,
+        * another function will be called from {@link 
org.apache.flink.graph.spargel.ScatterGatherIteration}.
+        *
+        * This function will retrieve the vertex from the vertexState and will 
set its degrees, afterwards calling
+        * the regular updateVertex function.
+        *
+        * @param vertexState
+        * @param inMessages
+        * @throws Exception
+        */
+       @SuppressWarnings("unchecked")
+       <VertexWithDegree> void 
updateVertexFromScatterGatherIteration(Vertex<K, VertexWithDegree> vertexState,
+                                                                               
                MessageIterator<Message> inMessages) throws Exception {
+
+               Vertex<K, VV> vertex = new Vertex<K, VV>(vertexState.f0,
+                               ((Tuple3<VV, Long, 
Long>)vertexState.getValue()).f0);
+
+               updateVertex(vertex, inMessages);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/918e5d0c/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/MessagingFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/MessagingFunction.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/MessagingFunction.java
deleted file mode 100644
index e12d779..0000000
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/MessagingFunction.java
+++ /dev/null
@@ -1,338 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.spargel;
-
-import java.io.Serializable;
-import java.util.Collection;
-import java.util.Iterator;
-
-import org.apache.flink.api.common.aggregators.Aggregator;
-import org.apache.flink.api.common.functions.IterationRuntimeContext;
-import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.graph.Edge;
-import org.apache.flink.graph.EdgeDirection;
-import org.apache.flink.graph.Vertex;
-import org.apache.flink.types.Value;
-import org.apache.flink.util.Collector;
-
-/**
- * The base class for functions that produce messages between vertices as a 
part of a {@link ScatterGatherIteration}.
- * 
- * @param <K> The type of the vertex key (the vertex identifier).
- * @param <VV> The type of the vertex value (the state of the vertex).
- * @param <Message> The type of the message sent between vertices along the 
edges.
- * @param <EV> The type of the values that are associated with the edges.
- */
-public abstract class MessagingFunction<K, VV, Message, EV> implements 
Serializable {
-
-       private static final long serialVersionUID = 1L;
-
-       // 
--------------------------------------------------------------------------------------------
-       //  Attributes that allow vertices to access their in/out degrees and 
the total number of vertices
-       //  inside an iteration.
-       // 
--------------------------------------------------------------------------------------------
-
-       private long numberOfVertices = -1L;
-
-       /**
-        * Retrieves the number of vertices in the graph.
-        * @return the number of vertices if the {@link 
org.apache.flink.graph.IterationConfiguration#setOptNumVertices(boolean)}
-        * option has been set; -1 otherwise.
-        */
-       public long getNumberOfVertices() {
-               return numberOfVertices;
-       }
-
-       void setNumberOfVertices(long numberOfVertices) {
-               this.numberOfVertices = numberOfVertices;
-       }
-
-       // 
--------------------------------------------------------------------------------------------
-       //  Attribute that allows the user to choose the neighborhood 
type(in/out/all) on which to run
-       //  the scatter gather iteration.
-       // 
--------------------------------------------------------------------------------------------
-
-       private EdgeDirection direction;
-
-       /**
-        * Retrieves the edge direction in which messages are propagated in the 
scatter-gather iteration.
-        * @return the messaging {@link EdgeDirection}
-        */
-       public EdgeDirection getDirection() {
-               return direction;
-       }
-
-       void setDirection(EdgeDirection direction) {
-               this.direction = direction;
-       }
-
-       // 
--------------------------------------------------------------------------------------------
-       //  Public API Methods
-       // 
--------------------------------------------------------------------------------------------
-
-       /**
-        * This method is invoked once per superstep for each vertex that was 
changed in that superstep.
-        * It needs to produce the messages that will be received by vertices 
in the next superstep.
-        * 
-        * @param vertex The vertex that was changed.
-        * 
-        * @throws Exception The computation may throw exceptions, which causes 
the superstep to fail.
-        */
-       public abstract void sendMessages(Vertex<K, VV> vertex) throws 
Exception;
-       
-       /**
-        * This method is executed once per superstep before the vertex update 
function is invoked for each vertex.
-        * 
-        * @throws Exception Exceptions in the pre-superstep phase cause the 
superstep to fail.
-        */
-       public void preSuperstep() throws Exception {}
-       
-       /**
-        * This method is executed once per superstep after the vertex update 
function has been invoked for each vertex.
-        * 
-        * @throws Exception Exceptions in the post-superstep phase cause the 
superstep to fail.
-        */
-       public void postSuperstep() throws Exception {}
-       
-       
-       /**
-        * Gets an {@link java.lang.Iterable} with all edges. This method is 
mutually exclusive with
-        * {@link #sendMessageToAllNeighbors(Object)} and may be called only 
once.
-        * <p>
-        * If the {@link EdgeDirection} is OUT (default), then this iterator 
contains outgoing edges.
-        * If the {@link EdgeDirection} is IN, then this iterator contains 
incoming edges.
-        * If the {@link EdgeDirection} is ALL, then this iterator contains 
both outgoing and incoming edges.
-        * 
-        * @return An iterator with all edges.
-        */
-       @SuppressWarnings("unchecked")
-       public Iterable<Edge<K, EV>> getEdges() {
-               if (edgesUsed) {
-                       throw new IllegalStateException("Can use either 
'getEdges()' or 'sendMessageToAllNeighbors()' exactly once.");
-               }
-               edgesUsed = true;
-               this.edgeIterator.set((Iterator<Edge<K, EV>>) edges);
-               return this.edgeIterator;
-       }
-
-       /**
-        * Sends the given message to all vertices that are targets of an edge 
of the changed vertex.
-        * This method is mutually exclusive to the method {@link #getEdges()} 
and may be called only once.
-        * <p>
-        * If the {@link EdgeDirection} is OUT (default), the message will be 
sent to out-neighbors.
-        * If the {@link EdgeDirection} is IN, the message will be sent to 
in-neighbors.
-        * If the {@link EdgeDirection} is ALL, the message will be sent to all 
neighbors.
-        * 
-        * @param m The message to send.
-        */
-       public void sendMessageToAllNeighbors(Message m) {
-               if (edgesUsed) {
-                       throw new IllegalStateException("Can use either 
'getEdges()' or 'sendMessageToAllNeighbors()'"
-                                       + "exactly once.");
-               }
-               
-               edgesUsed = true;
-               outValue.f1 = m;
-               
-               while (edges.hasNext()) {
-                       Tuple next = (Tuple) edges.next();
-
-                       /*
-                        * When EdgeDirection is OUT, the edges iterator only 
has the out-edges 
-                        * of the vertex, i.e. the ones where this vertex is 
src. 
-                        * next.getField(1) gives the neighbor of the vertex 
running this MessagingFunction.
-                        */
-                       if (getDirection().equals(EdgeDirection.OUT)) {
-                               outValue.f0 = next.getField(1);
-                       }
-                       /*
-                        * When EdgeDirection is IN, the edges iterator only 
has the in-edges 
-                        * of the vertex, i.e. the ones where this vertex is 
trg. 
-                        * next.getField(10) gives the neighbor of the vertex 
running this MessagingFunction.
-                        */
-                       else if (getDirection().equals(EdgeDirection.IN)) {
-                               outValue.f0 = next.getField(0);
-                       }
-                        // When EdgeDirection is ALL, the edges iterator 
contains both in- and out- edges
-                       if (getDirection().equals(EdgeDirection.ALL)) {
-                               if (next.getField(0).equals(vertexId)) {
-                                       // send msg to the trg
-                                       outValue.f0 = next.getField(1);
-                               }
-                               else {
-                                       // send msg to the src
-                                       outValue.f0 = next.getField(0);
-                               }
-                       }
-                       out.collect(outValue);
-               }
-       }
-       
-       /**
-        * Sends the given message to the vertex identified by the given key. 
If the target vertex does not exist,
-        * the next superstep will cause an exception due to a non-deliverable 
message.
-        * 
-        * @param target The key (id) of the target vertex to message.
-        * @param m The message.
-        */
-       public void sendMessageTo(K target, Message m) {
-               outValue.f0 = target;
-               outValue.f1 = m;
-               out.collect(outValue);
-       }
-
-       // 
--------------------------------------------------------------------------------------------
-       
-       /**
-        * Gets the number of the superstep, starting at <tt>1</tt>.
-        * 
-        * @return The number of the current superstep.
-        */
-       public int getSuperstepNumber() {
-               return this.runtimeContext.getSuperstepNumber();
-       }
-       
-       /**
-        * Gets the iteration aggregator registered under the given name. The 
iteration aggregator combines
-        * all aggregates globally once per superstep and makes them available 
in the next superstep.
-        * 
-        * @param name The name of the aggregator.
-        * @return The aggregator registered under this name, or null, if no 
aggregator was registered.
-        */
-       public <T extends Aggregator<?>> T getIterationAggregator(String name) {
-               return this.runtimeContext.<T>getIterationAggregator(name);
-       }
-       
-       /**
-        * Get the aggregated value that an aggregator computed in the previous 
iteration.
-        * 
-        * @param name The name of the aggregator.
-        * @return The aggregated value of the previous iteration.
-        */
-       public <T extends Value> T getPreviousIterationAggregate(String name) {
-               return 
this.runtimeContext.<T>getPreviousIterationAggregate(name);
-       }
-       
-       /**
-        * Gets the broadcast data set registered under the given name. 
Broadcast data sets
-        * are available on all parallel instances of a function. They can be 
registered via
-        * {@link 
org.apache.flink.graph.spargel.ScatterGatherConfiguration#addBroadcastSetForMessagingFunction(String,
 org.apache.flink.api.java.DataSet)}.
-        * 
-        * @param name The name under which the broadcast set is registered.
-        * @return The broadcast data set.
-        */
-       public <T> Collection<T> getBroadcastSet(String name) {
-               return this.runtimeContext.<T>getBroadcastVariable(name);
-       }
-
-       // 
--------------------------------------------------------------------------------------------
-       //  internal methods and state
-       // 
--------------------------------------------------------------------------------------------
-       
-       private Tuple2<K, Message> outValue;
-       
-       private IterationRuntimeContext runtimeContext;
-       
-       private Iterator<?> edges;
-       
-       private Collector<Tuple2<K, Message>> out;
-
-       private K vertexId;
-       
-       private EdgesIterator<K, EV> edgeIterator;
-       
-       private boolean edgesUsed;
-
-       private long inDegree = -1;
-
-       private long outDegree = -1;
-       
-       void init(IterationRuntimeContext context) {
-               this.runtimeContext = context;
-               this.outValue = new Tuple2<K, Message>();
-               this.edgeIterator = new EdgesIterator<K, EV>();
-       }
-       
-       void set(Iterator<?> edges, Collector<Tuple2<K, Message>> out, K id) {
-               this.edges = edges;
-               this.out = out;
-               this.vertexId = id;
-               this.edgesUsed = false;
-       }
-       
-       private static final class EdgesIterator<K, EV> 
-               implements Iterator<Edge<K, EV>>, Iterable<Edge<K, EV>>
-       {
-               private Iterator<Edge<K, EV>> input;
-               
-               private Edge<K, EV> edge = new Edge<K, EV>();
-               
-               void set(Iterator<Edge<K, EV>> input) {
-                       this.input = input;
-               }
-               
-               @Override
-               public boolean hasNext() {
-                       return input.hasNext();
-               }
-
-               @Override
-               public Edge<K, EV> next() {
-                       Edge<K, EV> next = input.next();
-                       edge.setSource(next.f0);
-                       edge.setTarget(next.f1);
-                       edge.setValue(next.f2);
-                       return edge;
-               }
-
-               @Override
-               public void remove() {
-                       throw new UnsupportedOperationException();
-               }
-               @Override
-               public Iterator<Edge<K, EV>> iterator() {
-                       return this;
-               }
-       }
-
-       /**
-        * Retrieves the vertex in-degree (number of in-coming edges).
-        * @return The in-degree of this vertex
-        */
-       public long getInDegree() {
-               return inDegree;
-       }
-
-       void setInDegree(long inDegree) {
-               this.inDegree = inDegree;
-       }
-
-       /**
-        * Retrieve the vertex out-degree (number of out-going edges).
-        * @return The out-degree of this vertex
-        */
-       public long getOutDegree() {
-               return outDegree;
-       }
-
-       void setOutDegree(long outDegree) {
-               this.outDegree = outDegree;
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/918e5d0c/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/ScatterFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/ScatterFunction.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/ScatterFunction.java
new file mode 100644
index 0000000..336e73d
--- /dev/null
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/ScatterFunction.java
@@ -0,0 +1,338 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.spargel;
+
+import org.apache.flink.api.common.aggregators.Aggregator;
+import org.apache.flink.api.common.functions.IterationRuntimeContext;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.EdgeDirection;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.types.Value;
+import org.apache.flink.util.Collector;
+
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.Iterator;
+
+/**
+ * The base class for functions that produce messages between vertices as a 
part of a {@link ScatterGatherIteration}.
+ * 
+ * @param <K> The type of the vertex key (the vertex identifier).
+ * @param <VV> The type of the vertex value (the state of the vertex).
+ * @param <Message> The type of the message sent between vertices along the 
edges.
+ * @param <EV> The type of the values that are associated with the edges.
+ */
+public abstract class ScatterFunction<K, VV, Message, EV> implements 
Serializable {
+
+       private static final long serialVersionUID = 1L;
+
+       // 
--------------------------------------------------------------------------------------------
+       //  Attributes that allow vertices to access their in/out degrees and 
the total number of vertices
+       //  inside an iteration.
+       // 
--------------------------------------------------------------------------------------------
+
+       private long numberOfVertices = -1L;
+
+       /**
+        * Retrieves the number of vertices in the graph.
+        * @return the number of vertices if the {@link 
org.apache.flink.graph.IterationConfiguration#setOptNumVertices(boolean)}
+        * option has been set; -1 otherwise.
+        */
+       public long getNumberOfVertices() {
+               return numberOfVertices;
+       }
+
+       void setNumberOfVertices(long numberOfVertices) {
+               this.numberOfVertices = numberOfVertices;
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+       //  Attribute that allows the user to choose the neighborhood 
type(in/out/all) on which to run
+       //  the scatter gather iteration.
+       // 
--------------------------------------------------------------------------------------------
+
+       private EdgeDirection direction;
+
+       /**
+        * Retrieves the edge direction in which messages are propagated in the 
scatter-gather iteration.
+        * @return the messaging {@link EdgeDirection}
+        */
+       public EdgeDirection getDirection() {
+               return direction;
+       }
+
+       void setDirection(EdgeDirection direction) {
+               this.direction = direction;
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+       //  Public API Methods
+       // 
--------------------------------------------------------------------------------------------
+
+       /**
+        * This method is invoked once per superstep for each vertex that was 
changed in that superstep.
+        * It needs to produce the messages that will be received by vertices 
in the next superstep.
+        * 
+        * @param vertex The vertex that was changed.
+        * 
+        * @throws Exception The computation may throw exceptions, which causes 
the superstep to fail.
+        */
+       public abstract void sendMessages(Vertex<K, VV> vertex) throws 
Exception;
+
+       /**
+        * This method is executed once per superstep before the scatter 
function is invoked for each vertex.
+        * 
+        * @throws Exception Exceptions in the pre-superstep phase cause the 
superstep to fail.
+        */
+       public void preSuperstep() throws Exception {}
+
+       /**
+        * This method is executed once per superstep after the scatter 
function has been invoked for each vertex.
+        * 
+        * @throws Exception Exceptions in the post-superstep phase cause the 
superstep to fail.
+        */
+       public void postSuperstep() throws Exception {}
+
+
+       /**
+        * Gets an {@link java.lang.Iterable} with all edges. This method is 
mutually exclusive with
+        * {@link #sendMessageToAllNeighbors(Object)} and may be called only 
once.
+        * <p>
+        * If the {@link EdgeDirection} is OUT (default), then this iterator 
contains outgoing edges.
+        * If the {@link EdgeDirection} is IN, then this iterator contains 
incoming edges.
+        * If the {@link EdgeDirection} is ALL, then this iterator contains 
both outgoing and incoming edges.
+        * 
+        * @return An iterator with all edges.
+        */
+       @SuppressWarnings("unchecked")
+       public Iterable<Edge<K, EV>> getEdges() {
+               if (edgesUsed) {
+                       throw new IllegalStateException("Can use either 
'getEdges()' or 'sendMessageToAllNeighbors()' exactly once.");
+               }
+               edgesUsed = true;
+               this.edgeIterator.set((Iterator<Edge<K, EV>>) edges);
+               return this.edgeIterator;
+       }
+
+       /**
+        * Sends the given message to all vertices that are targets of an edge 
of the changed vertex.
+        * This method is mutually exclusive to the method {@link #getEdges()} 
and may be called only once.
+        * <p>
+        * If the {@link EdgeDirection} is OUT (default), the message will be 
sent to out-neighbors.
+        * If the {@link EdgeDirection} is IN, the message will be sent to 
in-neighbors.
+        * If the {@link EdgeDirection} is ALL, the message will be sent to all 
neighbors.
+        * 
+        * @param m The message to send.
+        */
+       public void sendMessageToAllNeighbors(Message m) {
+               if (edgesUsed) {
+                       throw new IllegalStateException("Can use either 
'getEdges()' or 'sendMessageToAllNeighbors()'"
+                                       + "exactly once.");
+               }
+
+               edgesUsed = true;
+               outValue.f1 = m;
+
+               while (edges.hasNext()) {
+                       Tuple next = (Tuple) edges.next();
+
+                       /*
+                        * When EdgeDirection is OUT, the edges iterator only 
has the out-edges 
+                        * of the vertex, i.e. the ones where this vertex is 
src. 
+                        * next.getField(1) gives the neighbor of the vertex 
running this ScatterFunction.
+                        */
+                       if (getDirection().equals(EdgeDirection.OUT)) {
+                               outValue.f0 = next.getField(1);
+                       }
+                       /*
+                        * When EdgeDirection is IN, the edges iterator only 
has the in-edges 
+                        * of the vertex, i.e. the ones where this vertex is 
trg. 
+                        * next.getField(10) gives the neighbor of the vertex 
running this ScatterFunction.
+                        */
+                       else if (getDirection().equals(EdgeDirection.IN)) {
+                               outValue.f0 = next.getField(0);
+                       }
+                        // When EdgeDirection is ALL, the edges iterator 
contains both in- and out- edges
+                       if (getDirection().equals(EdgeDirection.ALL)) {
+                               if (next.getField(0).equals(vertexId)) {
+                                       // send msg to the trg
+                                       outValue.f0 = next.getField(1);
+                               }
+                               else {
+                                       // send msg to the src
+                                       outValue.f0 = next.getField(0);
+                               }
+                       }
+                       out.collect(outValue);
+               }
+       }
+
+       /**
+        * Sends the given message to the vertex identified by the given key. 
If the target vertex does not exist,
+        * the next superstep will cause an exception due to a non-deliverable 
message.
+        * 
+        * @param target The key (id) of the target vertex to message.
+        * @param m The message.
+        */
+       public void sendMessageTo(K target, Message m) {
+               outValue.f0 = target;
+               outValue.f1 = m;
+               out.collect(outValue);
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+
+       /**
+        * Gets the number of the superstep, starting at <tt>1</tt>.
+        * 
+        * @return The number of the current superstep.
+        */
+       public int getSuperstepNumber() {
+               return this.runtimeContext.getSuperstepNumber();
+       }
+
+       /**
+        * Gets the iteration aggregator registered under the given name. The 
iteration aggregator combines
+        * all aggregates globally once per superstep and makes them available 
in the next superstep.
+        * 
+        * @param name The name of the aggregator.
+        * @return The aggregator registered under this name, or null, if no 
aggregator was registered.
+        */
+       public <T extends Aggregator<?>> T getIterationAggregator(String name) {
+               return this.runtimeContext.<T>getIterationAggregator(name);
+       }
+
+       /**
+        * Get the aggregated value that an aggregator computed in the previous 
iteration.
+        * 
+        * @param name The name of the aggregator.
+        * @return The aggregated value of the previous iteration.
+        */
+       public <T extends Value> T getPreviousIterationAggregate(String name) {
+               return 
this.runtimeContext.<T>getPreviousIterationAggregate(name);
+       }
+
+       /**
+        * Gets the broadcast data set registered under the given name. 
Broadcast data sets
+        * are available on all parallel instances of a function. They can be 
registered via
+        * {@link 
org.apache.flink.graph.spargel.ScatterGatherConfiguration#addBroadcastSetForScatterFunction(String,
 org.apache.flink.api.java.DataSet)}.
+        * 
+        * @param name The name under which the broadcast set is registered.
+        * @return The broadcast data set.
+        */
+       public <T> Collection<T> getBroadcastSet(String name) {
+               return this.runtimeContext.<T>getBroadcastVariable(name);
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+       //  internal methods and state
+       // 
--------------------------------------------------------------------------------------------
+
+       private Tuple2<K, Message> outValue;
+
+       private IterationRuntimeContext runtimeContext;
+
+       private Iterator<?> edges;
+
+       private Collector<Tuple2<K, Message>> out;
+
+       private K vertexId;
+
+       private EdgesIterator<K, EV> edgeIterator;
+
+       private boolean edgesUsed;
+
+       private long inDegree = -1;
+
+       private long outDegree = -1;
+
+       void init(IterationRuntimeContext context) {
+               this.runtimeContext = context;
+               this.outValue = new Tuple2<K, Message>();
+               this.edgeIterator = new EdgesIterator<K, EV>();
+       }
+
+       void set(Iterator<?> edges, Collector<Tuple2<K, Message>> out, K id) {
+               this.edges = edges;
+               this.out = out;
+               this.vertexId = id;
+               this.edgesUsed = false;
+       }
+
+       private static final class EdgesIterator<K, EV> 
+               implements Iterator<Edge<K, EV>>, Iterable<Edge<K, EV>>
+       {
+               private Iterator<Edge<K, EV>> input;
+
+               private Edge<K, EV> edge = new Edge<K, EV>();
+
+               void set(Iterator<Edge<K, EV>> input) {
+                       this.input = input;
+               }
+
+               @Override
+               public boolean hasNext() {
+                       return input.hasNext();
+               }
+
+               @Override
+               public Edge<K, EV> next() {
+                       Edge<K, EV> next = input.next();
+                       edge.setSource(next.f0);
+                       edge.setTarget(next.f1);
+                       edge.setValue(next.f2);
+                       return edge;
+               }
+
+               @Override
+               public void remove() {
+                       throw new UnsupportedOperationException();
+               }
+               @Override
+               public Iterator<Edge<K, EV>> iterator() {
+                       return this;
+               }
+       }
+
+       /**
+        * Retrieves the vertex in-degree (number of in-coming edges).
+        * @return The in-degree of this vertex
+        */
+       public long getInDegree() {
+               return inDegree;
+       }
+
+       void setInDegree(long inDegree) {
+               this.inDegree = inDegree;
+       }
+
+       /**
+        * Retrieve the vertex out-degree (number of out-going edges).
+        * @return The out-degree of this vertex
+        */
+       public long getOutDegree() {
+               return outDegree;
+       }
+
+       void setOutDegree(long outDegree) {
+               this.outDegree = outDegree;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/918e5d0c/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/ScatterGatherConfiguration.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/ScatterGatherConfiguration.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/ScatterGatherConfiguration.java
index 3a3de64..4ac1ae1 100644
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/ScatterGatherConfiguration.java
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/ScatterGatherConfiguration.java
@@ -29,20 +29,20 @@ import java.util.List;
 /**
  * A ScatterGatherConfiguration object can be used to set the iteration name 
and
  * degree of parallelism, to register aggregators and use broadcast sets in
- * the {@link org.apache.flink.graph.spargel.VertexUpdateFunction} and {@link 
org.apache.flink.graph.spargel.MessagingFunction}
+ * the {@link GatherFunction} and {@link ScatterFunction}
  *
  * The VertexCentricConfiguration object is passed as an argument to
  * {@link org.apache.flink.graph.Graph#runScatterGatherIteration (
- * org.apache.flink.graph.spargel.VertexUpdateFunction, 
org.apache.flink.graph.spargel.MessagingFunction, int,
+ * org.apache.flink.graph.spargel.GatherFunction, 
org.apache.flink.graph.spargel.ScatterFunction, int,
  * ScatterGatherConfiguration)}.
  */
 public class ScatterGatherConfiguration extends IterationConfiguration {
 
-       /** the broadcast variables for the update function **/
-       private List<Tuple2<String, DataSet<?>>> bcVarsUpdate = new 
ArrayList<Tuple2<String,DataSet<?>>>();
+       /** the broadcast variables for the scatter function **/
+       private List<Tuple2<String, DataSet<?>>> bcVarsScatter = new 
ArrayList<>();
 
-       /** the broadcast variables for the messaging function **/
-       private List<Tuple2<String, DataSet<?>>> bcVarsMessaging = new 
ArrayList<Tuple2<String,DataSet<?>>>();
+       /** the broadcast variables for the gather function **/
+       private List<Tuple2<String, DataSet<?>>> bcVarsGather = new 
ArrayList<>();
 
        /** flag that defines whether the degrees option is set **/
        private boolean optDegrees = false;
@@ -53,43 +53,43 @@ public class ScatterGatherConfiguration extends 
IterationConfiguration {
        public ScatterGatherConfiguration() {}
 
        /**
-        * Adds a data set as a broadcast set to the messaging function.
+        * Adds a data set as a broadcast set to the scatter function.
         *
-        * @param name The name under which the broadcast data is available in 
the messaging function.
+        * @param name The name under which the broadcast data is available in 
the scatter function.
         * @param data The data set to be broadcasted.
         */
-       public void addBroadcastSetForMessagingFunction(String name, DataSet<?> 
data) {
-               this.bcVarsMessaging.add(new Tuple2<String, DataSet<?>>(name, 
data));
+       public void addBroadcastSetForScatterFunction(String name, DataSet<?> 
data) {
+               this.bcVarsScatter.add(new Tuple2<String, DataSet<?>>(name, 
data));
        }
 
        /**
-        * Adds a data set as a broadcast set to the vertex update function.
+        * Adds a data set as a broadcast set to the gather function.
         *
-        * @param name The name under which the broadcast data is available in 
the vertex update function.
+        * @param name The name under which the broadcast data is available in 
the gather function.
         * @param data The data set to be broadcasted.
         */
-       public void addBroadcastSetForUpdateFunction(String name, DataSet<?> 
data) {
-               this.bcVarsUpdate.add(new Tuple2<String, DataSet<?>>(name, 
data));
+       public void addBroadcastSetForGatherFunction(String name, DataSet<?> 
data) {
+               this.bcVarsGather.add(new Tuple2<String, DataSet<?>>(name, 
data));
        }
 
        /**
-        * Get the broadcast variables of the VertexUpdateFunction.
+        * Get the broadcast variables of the ScatterFunction.
         *
         * @return a List of Tuple2, where the first field is the broadcast 
variable name
         * and the second field is the broadcast DataSet.
         */
-       public List<Tuple2<String, DataSet<?>>> getUpdateBcastVars() {
-               return this.bcVarsUpdate;
+       public List<Tuple2<String, DataSet<?>>> getScatterBcastVars() {
+               return this.bcVarsScatter;
        }
 
        /**
-        * Get the broadcast variables of the MessagingFunction.
+        * Get the broadcast variables of the GatherFunction.
         *
         * @return a List of Tuple2, where the first field is the broadcast 
variable name
         * and the second field is the broadcast DataSet.
         */
-       public List<Tuple2<String, DataSet<?>>> getMessagingBcastVars() {
-               return this.bcVarsMessaging;
+       public List<Tuple2<String, DataSet<?>>> getGatherBcastVars() {
+               return this.bcVarsGather;
        }
 
        /**
@@ -113,7 +113,7 @@ public class ScatterGatherConfiguration extends 
IterationConfiguration {
        }
 
        /**
-        * Gets the direction in which messages are sent in the 
MessagingFunction.
+        * Gets the direction in which messages are sent in the ScatterFunction.
         * By default the messaging direction is OUT.
         *
         * @return an EdgeDirection, which can be either IN, OUT or ALL.
@@ -123,7 +123,7 @@ public class ScatterGatherConfiguration extends 
IterationConfiguration {
        }
 
        /**
-        * Sets the direction in which messages are sent in the 
MessagingFunction.
+        * Sets the direction in which messages are sent in the ScatterFunction.
         * By default the messaging direction is OUT.
         *
         * @param direction - IN, OUT or ALL

http://git-wip-us.apache.org/repos/asf/flink/blob/918e5d0c/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/ScatterGatherIteration.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/ScatterGatherIteration.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/ScatterGatherIteration.java
index fc5c210..fde305f 100644
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/ScatterGatherIteration.java
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/ScatterGatherIteration.java
@@ -53,21 +53,21 @@ import java.util.Map;
  * Scatter-Gather algorithms operate on graphs, which are defined through 
vertices and edges. The 
  * algorithms send messages along the edges and update the state of vertices 
based on
  * the old state and the incoming messages. All vertices have an initial state.
- * The computation terminates once no vertex updates it state any more.
+ * The computation terminates once no vertex updates its state any more.
  * Additionally, a maximum number of iterations (supersteps) may be specified.
  * <p>
  * The computation is here represented by two functions:
  * <ul>
- *   <li>The {@link VertexUpdateFunction} receives incoming messages and may 
updates the state for
+ *   <li>The {@link GatherFunction} receives incoming messages and may updates 
the state for
  *   the vertex. If a state is updated, messages are sent from this vertex. 
Initially, all vertices are
  *   considered updated.</li>
- *   <li>The {@link MessagingFunction} takes the new vertex state and sends 
messages along the outgoing
+ *   <li>The {@link ScatterFunction} takes the new vertex state and sends 
messages along the outgoing
  *   edges of the vertex. The outgoing edges may optionally have an associated 
value, such as a weight.</li>
  * </ul>
  * <p>
  *
  * Scatter-Gather graph iterations are are run by calling
- * {@link Graph#runScatterGatherIteration(VertexUpdateFunction, 
MessagingFunction, int)}.
+ * {@link Graph#runScatterGatherIteration(ScatterFunction, GatherFunction, 
int)}.
  *
  * @param <K> The type of the vertex key (the vertex identifier).
  * @param <VV> The type of the vertex value (the state of the vertex).
@@ -77,47 +77,47 @@ import java.util.Map;
 public class ScatterGatherIteration<K, VV, Message, EV> 
        implements CustomUnaryOperation<Vertex<K, VV>, Vertex<K, VV>>
 {
-       private final VertexUpdateFunction<K, VV, Message> updateFunction;
+       private final ScatterFunction<K, VV, Message, EV> scatterFunction;
+
+       private final GatherFunction<K, VV, Message> gatherFunction;
 
-       private final MessagingFunction<K, VV, Message, EV> messagingFunction;
-       
        private final DataSet<Edge<K, EV>> edgesWithValue;
-       
+
        private final int maximumNumberOfIterations;
-       
+
        private final TypeInformation<Message> messageType;
-       
+
        private DataSet<Vertex<K, VV>> initialVertices;
 
        private ScatterGatherConfiguration configuration;
 
        // 
----------------------------------------------------------------------------------
-       
-       private ScatterGatherIteration(VertexUpdateFunction<K, VV, Message> uf,
-                       MessagingFunction<K, VV, Message, EV> mf,
+
+       private ScatterGatherIteration(ScatterFunction<K, VV, Message, EV> sf,
+                       GatherFunction<K, VV, Message> gf,
                        DataSet<Edge<K, EV>> edgesWithValue, 
                        int maximumNumberOfIterations)
        {
-               Preconditions.checkNotNull(uf);
-               Preconditions.checkNotNull(mf);
+               Preconditions.checkNotNull(sf);
+               Preconditions.checkNotNull(gf);
                Preconditions.checkNotNull(edgesWithValue);
                Preconditions.checkArgument(maximumNumberOfIterations > 0, "The 
maximum number of iterations must be at least one.");
 
-               this.updateFunction = uf;
-               this.messagingFunction = mf;
+               this.scatterFunction = sf;
+               this.gatherFunction = gf;
                this.edgesWithValue = edgesWithValue;
-               this.maximumNumberOfIterations = maximumNumberOfIterations;     
        
-               this.messageType = getMessageType(mf);
+               this.maximumNumberOfIterations = maximumNumberOfIterations;
+               this.messageType = getMessageType(sf);
        }
-       
-       private TypeInformation<Message> getMessageType(MessagingFunction<K, 
VV, Message, EV> mf) {
-               return TypeExtractor.createTypeInfo(mf, 
MessagingFunction.class, mf.getClass(), 2);
+
+       private TypeInformation<Message> getMessageType(ScatterFunction<K, VV, 
Message, EV> mf) {
+               return TypeExtractor.createTypeInfo(mf, ScatterFunction.class, 
mf.getClass(), 2);
        }
-       
+
        // 
--------------------------------------------------------------------------------------------
        //  Custom Operator behavior
        // 
--------------------------------------------------------------------------------------------
-       
+
        /**
         * Sets the input data set for this operator. In the case of this 
operator this input data set represents
         * the set of vertices with their initial state.
@@ -131,7 +131,7 @@ public class ScatterGatherIteration<K, VV, Message, EV>
        public void setInput(DataSet<Vertex<K, VV>> inputData) {
                this.initialVertices = inputData;
        }
-       
+
        /**
         * Creates the operator that represents this scatter-gather graph 
computation.
         * 
@@ -145,14 +145,14 @@ public class ScatterGatherIteration<K, VV, Message, EV>
 
                // prepare some type information
                TypeInformation<K> keyType = ((TupleTypeInfo<?>) 
initialVertices.getType()).getTypeAt(0);
-               TypeInformation<Tuple2<K, Message>> messageTypeInfo = new 
TupleTypeInfo<Tuple2<K,Message>>(keyType, messageType);
+               TypeInformation<Tuple2<K, Message>> messageTypeInfo = new 
TupleTypeInfo<>(keyType, messageType);
 
                // create a graph
                Graph<K, VV, EV> graph =
                                Graph.fromDataSet(initialVertices, 
edgesWithValue, initialVertices.getExecutionEnvironment());
 
                // check whether the numVertices option is set and, if so, 
compute the total number of vertices
-               // and set it within the messaging and update functions
+               // and set it within the scatter and gather functions
 
                DataSet<LongValue> numberOfVertices = null;
                if (this.configuration != null && 
this.configuration.isOptNumVertices()) {
@@ -164,13 +164,13 @@ public class ScatterGatherIteration<K, VV, Message, EV>
                }
 
                if(this.configuration != null) {
-                       
messagingFunction.setDirection(this.configuration.getDirection());
+                       
scatterFunction.setDirection(this.configuration.getDirection());
                } else {
-                       messagingFunction.setDirection(EdgeDirection.OUT);
+                       scatterFunction.setDirection(EdgeDirection.OUT);
                }
 
                // retrieve the direction in which the updates are made and in 
which the messages are sent
-               EdgeDirection messagingDirection = 
messagingFunction.getDirection();
+               EdgeDirection messagingDirection = 
scatterFunction.getDirection();
 
                // check whether the degrees option is set and, if so, compute 
the in and the out degrees and
                // add them to the vertex value
@@ -186,9 +186,9 @@ public class ScatterGatherIteration<K, VV, Message, EV>
         * a weight or distance).
         * 
         * @param edgesWithValue The data set containing edges.
-        * @param uf The function that updates the state of the vertices from 
the incoming messages.
-        * @param mf The function that turns changed vertex states into 
messages along the edges.
-        * 
+        * @param sf The function that turns changed vertex states into 
messages along the edges.
+        * @param gf The function that updates the state of the vertices from 
the incoming messages.
+        *
         * @param <K> The type of the vertex key (the vertex identifier).
         * @param <VV> The type of the vertex value (the state of the vertex).
         * @param <Message> The type of the message sent between vertices along 
the edges.
@@ -196,14 +196,11 @@ public class ScatterGatherIteration<K, VV, Message, EV>
         * 
         * @return An in stance of the scatter-gather graph computation 
operator.
         */
-       public static final <K, VV, Message, EV>
-                       ScatterGatherIteration<K, VV, Message, EV> withEdges(
-                                       DataSet<Edge<K, EV>> edgesWithValue,
-                                       VertexUpdateFunction<K, VV, Message> uf,
-                                       MessagingFunction<K, VV, Message, EV> 
mf,
-                                       int maximumNumberOfIterations)
+       public static final <K, VV, Message, EV> ScatterGatherIteration<K, VV, 
Message, EV> withEdges(
+               DataSet<Edge<K, EV>> edgesWithValue, ScatterFunction<K, VV, 
Message, EV> sf,
+               GatherFunction<K, VV, Message> gf, int 
maximumNumberOfIterations)
        {
-               return new ScatterGatherIteration<K, VV, Message, EV>(uf, mf, 
edgesWithValue, maximumNumberOfIterations);
+               return new ScatterGatherIteration<>(sf, gf, edgesWithValue, 
maximumNumberOfIterations);
        }
 
        /**
@@ -226,23 +223,122 @@ public class ScatterGatherIteration<K, VV, Message, EV>
        //  Wrapping UDFs
        // 
--------------------------------------------------------------------------------------------
 
-       private static abstract class VertexUpdateUdf<K, VVWithDegrees, 
Message> extends RichCoGroupFunction<
+       /*
+        * UDF that encapsulates the message sending function for graphs where 
the edges have an associated value.
+        */
+       private static abstract class ScatterUdfWithEdgeValues<K, 
VVWithDegrees, VV, Message, EV>
+                       extends RichCoGroupFunction<Edge<K, EV>, Vertex<K, 
VVWithDegrees>, Tuple2<K, Message>>
+                       implements ResultTypeQueryable<Tuple2<K, Message>>
+       {
+               private static final long serialVersionUID = 1L;
+
+               final ScatterFunction<K, VV, Message, EV> scatterFunction;
+
+               private transient TypeInformation<Tuple2<K, Message>> 
resultType;
+
+
+               private ScatterUdfWithEdgeValues(ScatterFunction<K, VV, 
Message, EV> scatterFunction,
+                               TypeInformation<Tuple2<K, Message>> resultType)
+               {
+                       this.scatterFunction = scatterFunction;
+                       this.resultType = resultType;
+               }
+
+               @Override
+               public void open(Configuration parameters) throws Exception {
+                       if (getRuntimeContext().hasBroadcastVariable("number of 
vertices")) {
+                               Collection<LongValue> numberOfVertices = 
getRuntimeContext().getBroadcastVariable("number of vertices");
+                               
this.scatterFunction.setNumberOfVertices(numberOfVertices.iterator().next().getValue());
+                       }
+                       if (getIterationRuntimeContext().getSuperstepNumber() 
== 1) {
+                               
this.scatterFunction.init(getIterationRuntimeContext());
+                       }
+                       this.scatterFunction.preSuperstep();
+               }
+
+               @Override
+               public void close() throws Exception {
+                       this.scatterFunction.postSuperstep();
+               }
+
+               @Override
+               public TypeInformation<Tuple2<K, Message>> getProducedType() {
+                       return this.resultType;
+               }
+       }
+
+       @SuppressWarnings("serial")
+       private static final class ScatterUdfWithEVsSimpleVV<K, VV, Message, EV>
+                       extends ScatterUdfWithEdgeValues<K, VV, VV, Message, 
EV> {
+
+               private ScatterUdfWithEVsSimpleVV(ScatterFunction<K, VV, 
Message, EV> scatterFunction,
+                               TypeInformation<Tuple2<K, Message>> resultType) 
{
+                       super(scatterFunction, resultType);
+               }
+
+               @Override
+               public void coGroup(Iterable<Edge<K, EV>> edges,
+                                                       Iterable<Vertex<K, VV>> 
state,
+                                                       Collector<Tuple2<K, 
Message>> out) throws Exception {
+                       final Iterator<Vertex<K, VV>> stateIter = 
state.iterator();
+
+                       if (stateIter.hasNext()) {
+                               Vertex<K, VV> newVertexState = stateIter.next();
+                               scatterFunction.set(edges.iterator(), out, 
newVertexState.getId());
+                               scatterFunction.sendMessages(newVertexState);
+                       }
+               }
+       }
+
+       @SuppressWarnings("serial")
+       private static final class ScatterUdfWithEVsVVWithDegrees<K, VV, 
Message, EV>
+                       extends ScatterUdfWithEdgeValues<K, Tuple3<VV, 
LongValue, LongValue>, VV, Message, EV> {
+
+               private Vertex<K, VV> nextVertex = new Vertex<>();
+
+               private ScatterUdfWithEVsVVWithDegrees(ScatterFunction<K, VV, 
Message, EV> scatterFunction,
+                               TypeInformation<Tuple2<K, Message>> resultType) 
{
+                       super(scatterFunction, resultType);
+               }
+
+               @Override
+               public void coGroup(Iterable<Edge<K, EV>> edges, 
Iterable<Vertex<K, Tuple3<VV, LongValue, LongValue>>> state,
+                                                       Collector<Tuple2<K, 
Message>> out) throws Exception {
+
+                       final Iterator<Vertex<K, Tuple3<VV, LongValue, 
LongValue>>> stateIter = state.iterator();
+
+                       if (stateIter.hasNext()) {
+                               Vertex<K, Tuple3<VV, LongValue, LongValue>> 
vertexWithDegrees = stateIter.next();
+
+                               nextVertex.setField(vertexWithDegrees.f0, 0);
+                               nextVertex.setField(vertexWithDegrees.f1.f0, 1);
+
+                               
scatterFunction.setInDegree(vertexWithDegrees.f1.f1.getValue());
+                               
scatterFunction.setOutDegree(vertexWithDegrees.f1.f2.getValue());
+
+                               scatterFunction.set(edges.iterator(), out, 
vertexWithDegrees.getId());
+                               scatterFunction.sendMessages(nextVertex);
+                       }
+               }
+       }
+
+       private static abstract class GatherUdf<K, VVWithDegrees, Message> 
extends RichCoGroupFunction<
                Tuple2<K, Message>, Vertex<K, VVWithDegrees>, Vertex<K, 
VVWithDegrees>>
                implements ResultTypeQueryable<Vertex<K, VVWithDegrees>>
        {
                private static final long serialVersionUID = 1L;
-               
-               final VertexUpdateFunction<K, VVWithDegrees, Message> 
vertexUpdateFunction;
 
-               final MessageIterator<Message> messageIter = new 
MessageIterator<Message>();
-               
+               final GatherFunction<K, VVWithDegrees, Message> gatherFunction;
+
+               final MessageIterator<Message> messageIter = new 
MessageIterator<>();
+
                private transient TypeInformation<Vertex<K, VVWithDegrees>> 
resultType;
-               
-               
-               private VertexUpdateUdf(VertexUpdateFunction<K, VVWithDegrees, 
Message> vertexUpdateFunction,
+
+
+               private GatherUdf(GatherFunction<K, VVWithDegrees, Message> 
gatherFunction,
                                TypeInformation<Vertex<K, VVWithDegrees>> 
resultType)
                {
-                       this.vertexUpdateFunction = vertexUpdateFunction;
+                       this.gatherFunction = gatherFunction;
                        this.resultType = resultType;
                }
 
@@ -250,17 +346,17 @@ public class ScatterGatherIteration<K, VV, Message, EV>
                public void open(Configuration parameters) throws Exception {
                        if (getRuntimeContext().hasBroadcastVariable("number of 
vertices")) {
                                Collection<LongValue> numberOfVertices = 
getRuntimeContext().getBroadcastVariable("number of vertices");
-                               
this.vertexUpdateFunction.setNumberOfVertices(numberOfVertices.iterator().next().getValue());
+                               
this.gatherFunction.setNumberOfVertices(numberOfVertices.iterator().next().getValue());
                        }
                        if (getIterationRuntimeContext().getSuperstepNumber() 
== 1) {
-                               
this.vertexUpdateFunction.init(getIterationRuntimeContext());
+                               
this.gatherFunction.init(getIterationRuntimeContext());
                        }
-                       this.vertexUpdateFunction.preSuperstep();
+                       this.gatherFunction.preSuperstep();
                }
-               
+
                @Override
                public void close() throws Exception {
-                       this.vertexUpdateFunction.postSuperstep();
+                       this.gatherFunction.postSuperstep();
                }
 
                @Override
@@ -270,10 +366,10 @@ public class ScatterGatherIteration<K, VV, Message, EV>
        }
 
        @SuppressWarnings("serial")
-       private static final class VertexUpdateUdfSimpleVV<K, VV, Message> 
extends VertexUpdateUdf<K, VV, Message> {
+       private static final class GatherUdfSimpleVV<K, VV, Message> extends 
GatherUdf<K, VV, Message> {
 
-               private VertexUpdateUdfSimpleVV(VertexUpdateFunction<K, VV, 
Message> vertexUpdateFunction, TypeInformation<Vertex<K, VV>> resultType) {
-                       super(vertexUpdateFunction, resultType);
+               private GatherUdfSimpleVV(GatherFunction<K, VV, Message> 
gatherFunction, TypeInformation<Vertex<K, VV>> resultType) {
+                       super(gatherFunction, resultType);
                }
 
                @Override
@@ -289,8 +385,8 @@ public class ScatterGatherIteration<K, VV, Message, EV>
                                Iterator<Tuple2<?, Message>> downcastIter = 
(Iterator<Tuple2<?, Message>>) (Iterator<?>) messages.iterator();
                                messageIter.setSource(downcastIter);
 
-                               vertexUpdateFunction.setOutput(vertexState, 
out);
-                               vertexUpdateFunction.updateVertex(vertexState, 
messageIter);
+                               gatherFunction.setOutput(vertexState, out);
+                               gatherFunction.updateVertex(vertexState, 
messageIter);
                        }
                        else {
                                final Iterator<Tuple2<K, Message>> messageIter 
= messages.iterator();
@@ -299,7 +395,7 @@ public class ScatterGatherIteration<K, VV, Message, EV>
                                        try {
                                                Tuple2<K, Message> next = 
messageIter.next();
                                                message = "Target vertex '" + 
next.f0 + "' does not exist!.";
-                                       } catch (Throwable t) {}
+                                       } catch (Throwable ignored) {}
                                        throw new Exception(message);
                                } else {
                                        throw new Exception();
@@ -309,31 +405,31 @@ public class ScatterGatherIteration<K, VV, Message, EV>
        }
 
        @SuppressWarnings("serial")
-       private static final class VertexUpdateUdfVVWithDegrees<K, VV, Message> 
extends VertexUpdateUdf<K, Tuple3<VV, LongValue, LongValue>, Message> {
+       private static final class GatherUdfVVWithDegrees<K, VV, Message> 
extends GatherUdf<K, Tuple3<VV, LongValue, LongValue>, Message> {
 
-               private VertexUpdateUdfVVWithDegrees(VertexUpdateFunction<K, 
Tuple3<VV, LongValue, LongValue>, Message> vertexUpdateFunction,
+               private GatherUdfVVWithDegrees(GatherFunction<K, Tuple3<VV, 
LongValue, LongValue>, Message> gatherFunction,
                                TypeInformation<Vertex<K, Tuple3<VV, LongValue, 
LongValue>>> resultType) {
-                       super(vertexUpdateFunction, resultType);
+                       super(gatherFunction, resultType);
                }
-               
+
                @Override
                public void coGroup(Iterable<Tuple2<K, Message>> messages, 
Iterable<Vertex<K, Tuple3<VV, LongValue, LongValue>>> vertex,
                                                        Collector<Vertex<K, 
Tuple3<VV, LongValue, LongValue>>> out) throws Exception {
 
                        final Iterator<Vertex<K, Tuple3<VV, LongValue, 
LongValue>>> vertexIter = vertex.iterator();
-               
+
                        if (vertexIter.hasNext()) {
                                Vertex<K, Tuple3<VV, LongValue, LongValue>> 
vertexWithDegrees = vertexIter.next();
-               
+
                                @SuppressWarnings("unchecked")
                                Iterator<Tuple2<?, Message>> downcastIter = 
(Iterator<Tuple2<?, Message>>) (Iterator<?>) messages.iterator();
                                messageIter.setSource(downcastIter);
 
-                               
vertexUpdateFunction.setInDegree(vertexWithDegrees.f1.f1.getValue());
-                               
vertexUpdateFunction.setOutDegree(vertexWithDegrees.f1.f2.getValue());
+                               
gatherFunction.setInDegree(vertexWithDegrees.f1.f1.getValue());
+                               
gatherFunction.setOutDegree(vertexWithDegrees.f1.f2.getValue());
 
-                               
vertexUpdateFunction.setOutputWithDegrees(vertexWithDegrees, out);
-                               
vertexUpdateFunction.updateVertexFromScatterGatherIteration(vertexWithDegrees, 
messageIter);
+                               
gatherFunction.setOutputWithDegrees(vertexWithDegrees, out);
+                               
gatherFunction.updateVertexFromScatterGatherIteration(vertexWithDegrees, 
messageIter);
                        }
                        else {
                                final Iterator<Tuple2<K, Message>> messageIter 
= messages.iterator();
@@ -342,7 +438,7 @@ public class ScatterGatherIteration<K, VV, Message, EV>
                                        try {
                                                Tuple2<K, Message> next = 
messageIter.next();
                                                message = "Target vertex '" + 
next.f0 + "' does not exist!.";
-                                       } catch (Throwable t) {}
+                                       } catch (Throwable ignored) {}
                                        throw new Exception(message);
                                } else {
                                        throw new Exception();
@@ -351,112 +447,12 @@ public class ScatterGatherIteration<K, VV, Message, EV>
                }
        }
 
-       /*
-        * UDF that encapsulates the message sending function for graphs where 
the edges have an associated value.
-        */
-       private static abstract class MessagingUdfWithEdgeValues<K, 
VVWithDegrees, VV, Message, EV>
-               extends RichCoGroupFunction<Edge<K, EV>, Vertex<K, 
VVWithDegrees>, Tuple2<K, Message>>
-               implements ResultTypeQueryable<Tuple2<K, Message>>
-       {
-               private static final long serialVersionUID = 1L;
-               
-               final MessagingFunction<K, VV, Message, EV> messagingFunction;
-               
-               private transient TypeInformation<Tuple2<K, Message>> 
resultType;
-       
-       
-               private MessagingUdfWithEdgeValues(MessagingFunction<K, VV, 
Message, EV> messagingFunction,
-                               TypeInformation<Tuple2<K, Message>> resultType)
-               {
-                       this.messagingFunction = messagingFunction;
-                       this.resultType = resultType;
-               }
-               
-               @Override
-               public void open(Configuration parameters) throws Exception {
-                       if (getRuntimeContext().hasBroadcastVariable("number of 
vertices")) {
-                               Collection<LongValue> numberOfVertices = 
getRuntimeContext().getBroadcastVariable("number of vertices");
-                               
this.messagingFunction.setNumberOfVertices(numberOfVertices.iterator().next().getValue());
-                       }
-                       if (getIterationRuntimeContext().getSuperstepNumber() 
== 1) {
-                               
this.messagingFunction.init(getIterationRuntimeContext());
-                       }
-                       this.messagingFunction.preSuperstep();
-               }
-               
-               @Override
-               public void close() throws Exception {
-                       this.messagingFunction.postSuperstep();
-               }
-               
-               @Override
-               public TypeInformation<Tuple2<K, Message>> getProducedType() {
-                       return this.resultType;
-               }
-       }
-
-       @SuppressWarnings("serial")
-       private static final class MessagingUdfWithEVsSimpleVV<K, VV, Message, 
EV>
-               extends MessagingUdfWithEdgeValues<K, VV, VV, Message, EV> {
-
-               private MessagingUdfWithEVsSimpleVV(MessagingFunction<K, VV, 
Message, EV> messagingFunction,
-                       TypeInformation<Tuple2<K, Message>> resultType) {
-                       super(messagingFunction, resultType);
-               }
-
-               @Override
-               public void coGroup(Iterable<Edge<K, EV>> edges,
-                                                       Iterable<Vertex<K, VV>> 
state,
-                                                       Collector<Tuple2<K, 
Message>> out) throws Exception {
-                       final Iterator<Vertex<K, VV>> stateIter = 
state.iterator();
-               
-                       if (stateIter.hasNext()) {
-                               Vertex<K, VV> newVertexState = stateIter.next();
-                               messagingFunction.set((Iterator<?>) 
edges.iterator(), out, newVertexState.getId());
-                               messagingFunction.sendMessages(newVertexState);
-                       }
-               }
-       }
-
-       @SuppressWarnings("serial")
-       private static final class MessagingUdfWithEVsVVWithDegrees<K, VV, 
Message, EV>
-               extends MessagingUdfWithEdgeValues<K, Tuple3<VV, LongValue, 
LongValue>, VV, Message, EV> {
-
-               private Vertex<K, VV> nextVertex = new Vertex<K, VV>();
-
-               private MessagingUdfWithEVsVVWithDegrees(MessagingFunction<K, 
VV, Message, EV> messagingFunction,
-                               TypeInformation<Tuple2<K, Message>> resultType) 
{
-                       super(messagingFunction, resultType);
-               }
-
-               @Override
-               public void coGroup(Iterable<Edge<K, EV>> edges, 
Iterable<Vertex<K, Tuple3<VV, LongValue, LongValue>>> state,
-                               Collector<Tuple2<K, Message>> out) throws 
Exception {
-
-                       final Iterator<Vertex<K, Tuple3<VV, LongValue, 
LongValue>>> stateIter = state.iterator();
-               
-                       if (stateIter.hasNext()) {
-                               Vertex<K, Tuple3<VV, LongValue, LongValue>> 
vertexWithDegrees = stateIter.next();
-
-                               nextVertex.setField(vertexWithDegrees.f0, 0);
-                               nextVertex.setField(vertexWithDegrees.f1.f0, 1);
-
-                               
messagingFunction.setInDegree(vertexWithDegrees.f1.f1.getValue());
-                               
messagingFunction.setOutDegree(vertexWithDegrees.f1.f2.getValue());
-
-                               messagingFunction.set((Iterator<?>) 
edges.iterator(), out, vertexWithDegrees.getId());
-                               messagingFunction.sendMessages(nextVertex);
-                       }
-               }
-       }
-
-
        // 
--------------------------------------------------------------------------------------------
        //  UTIL methods
        // 
--------------------------------------------------------------------------------------------
 
        /**
-        * Method that builds the messaging function using a coGroup operator 
for a simple vertex(without
+        * Method that builds the scatter function using a coGroup operator for 
a simple vertex (without
         * degrees).
         * It afterwards configures the function with a custom name and 
broadcast variables.
         *
@@ -464,17 +460,17 @@ public class ScatterGatherIteration<K, VV, Message, EV>
         * @param messageTypeInfo
         * @param whereArg the argument for the where within the coGroup
         * @param equalToArg the argument for the equalTo within the coGroup
-        * @return the messaging function
+        * @return the scatter function
         */
-       private CoGroupOperator<?, ?, Tuple2<K, Message>> 
buildMessagingFunction(
+       private CoGroupOperator<?, ?, Tuple2<K, Message>> buildScatterFunction(
                        DeltaIteration<Vertex<K, VV>, Vertex<K, VV>> iteration,
                        TypeInformation<Tuple2<K, Message>> messageTypeInfo, 
int whereArg, int equalToArg,
                        DataSet<LongValue> numberOfVertices) {
 
-               // build the messaging function (co group)
+               // build the scatter function (co group)
                CoGroupOperator<?, ?, Tuple2<K, Message>> messages;
-               MessagingUdfWithEdgeValues<K, VV, VV, Message, EV> messenger =
-                               new MessagingUdfWithEVsSimpleVV<K, VV, Message, 
EV>(messagingFunction, messageTypeInfo);
+               ScatterUdfWithEdgeValues<K, VV, VV, Message, EV> messenger =
+                               new 
ScatterUdfWithEVsSimpleVV<>(scatterFunction, messageTypeInfo);
 
                messages = 
this.edgesWithValue.coGroup(iteration.getWorkset()).where(whereArg)
                                .equalTo(equalToArg).with(messenger);
@@ -482,7 +478,7 @@ public class ScatterGatherIteration<K, VV, Message, EV>
                // configure coGroup message function with name and broadcast 
variables
                messages = messages.name("Messaging");
                if(this.configuration != null) {
-                       for (Tuple2<String, DataSet<?>> e : 
this.configuration.getMessagingBcastVars()) {
+                       for (Tuple2<String, DataSet<?>> e : 
this.configuration.getScatterBcastVars()) {
                                messages = messages.withBroadcastSet(e.f1, 
e.f0);
                        }
                        if (this.configuration.isOptNumVertices()) {
@@ -494,7 +490,7 @@ public class ScatterGatherIteration<K, VV, Message, EV>
        }
 
        /**
-        * Method that builds the messaging function using a coGroup operator 
for a vertex
+        * Method that builds the scatter function using a coGroup operator for 
a vertex
         * containing degree information.
         * It afterwards configures the function with a custom name and 
broadcast variables.
         *
@@ -502,17 +498,17 @@ public class ScatterGatherIteration<K, VV, Message, EV>
         * @param messageTypeInfo
         * @param whereArg the argument for the where within the coGroup
         * @param equalToArg the argument for the equalTo within the coGroup
-        * @return the messaging function
+        * @return the scatter function
         */
-       private CoGroupOperator<?, ?, Tuple2<K, Message>> 
buildMessagingFunctionVerticesWithDegrees(
+       private CoGroupOperator<?, ?, Tuple2<K, Message>> 
buildScatterFunctionVerticesWithDegrees(
                        DeltaIteration<Vertex<K, Tuple3<VV, LongValue, 
LongValue>>, Vertex<K, Tuple3<VV, LongValue, LongValue>>> iteration,
                        TypeInformation<Tuple2<K, Message>> messageTypeInfo, 
int whereArg, int equalToArg,
                        DataSet<LongValue> numberOfVertices) {
 
-               // build the messaging function (co group)
+               // build the scatter function (co group)
                CoGroupOperator<?, ?, Tuple2<K, Message>> messages;
-               MessagingUdfWithEdgeValues<K, Tuple3<VV, LongValue, LongValue>, 
VV, Message, EV> messenger =
-                               new MessagingUdfWithEVsVVWithDegrees<K, VV, 
Message, EV>(messagingFunction, messageTypeInfo);
+               ScatterUdfWithEdgeValues<K, Tuple3<VV, LongValue, LongValue>, 
VV, Message, EV> messenger =
+                               new 
ScatterUdfWithEVsVVWithDegrees<>(scatterFunction, messageTypeInfo);
 
                messages = 
this.edgesWithValue.coGroup(iteration.getWorkset()).where(whereArg)
                                .equalTo(equalToArg).with(messenger);
@@ -521,7 +517,7 @@ public class ScatterGatherIteration<K, VV, Message, EV>
                messages = messages.name("Messaging");
 
                if (this.configuration != null) {
-                       for (Tuple2<String, DataSet<?>> e : 
this.configuration.getMessagingBcastVars()) {
+                       for (Tuple2<String, DataSet<?>> e : 
this.configuration.getScatterBcastVars()) {
                                messages = messages.withBroadcastSet(e.f1, 
e.f0);
                        }
                        if (this.configuration.isOptNumVertices()) {
@@ -543,7 +539,7 @@ public class ScatterGatherIteration<K, VV, Message, EV>
                // set up the iteration operator
                if (this.configuration != null) {
 
-                       
iteration.name(this.configuration.getName("Scatter-gather iteration (" + 
updateFunction + " | " + messagingFunction + ")"));
+                       
iteration.name(this.configuration.getName("Scatter-gather iteration (" + 
gatherFunction + " | " + scatterFunction + ")"));
                        
iteration.parallelism(this.configuration.getParallelism());
                        
iteration.setSolutionSetUnManaged(this.configuration.isSolutionSetUnmanagedMemory());
 
@@ -554,7 +550,7 @@ public class ScatterGatherIteration<K, VV, Message, EV>
                }
                else {
                        // no configuration provided; set default name
-                       iteration.name("Scatter-gather iteration (" + 
updateFunction + " | " + messagingFunction + ")");
+                       iteration.name("Scatter-gather iteration (" + 
gatherFunction + " | " + scatterFunction + ")");
                }
        }
 
@@ -579,21 +575,21 @@ public class ScatterGatherIteration<K, VV, Message, EV>
 
                switch (messagingDirection) {
                        case IN:
-                               messages = buildMessagingFunction(iteration, 
messageTypeInfo, 1, 0, numberOfVertices);
+                               messages = buildScatterFunction(iteration, 
messageTypeInfo, 1, 0, numberOfVertices);
                                break;
                        case OUT:
-                               messages = buildMessagingFunction(iteration, 
messageTypeInfo, 0, 0, numberOfVertices);
+                               messages = buildScatterFunction(iteration, 
messageTypeInfo, 0, 0, numberOfVertices);
                                break;
                        case ALL:
-                               messages = buildMessagingFunction(iteration, 
messageTypeInfo, 1, 0, numberOfVertices)
-                                               
.union(buildMessagingFunction(iteration, messageTypeInfo, 0, 0, 
numberOfVertices)) ;
+                               messages = buildScatterFunction(iteration, 
messageTypeInfo, 1, 0, numberOfVertices)
+                                               
.union(buildScatterFunction(iteration, messageTypeInfo, 0, 0, 
numberOfVertices)) ;
                                break;
                        default:
                                throw new IllegalArgumentException("Illegal 
edge direction");
                }
 
-               VertexUpdateUdf<K, VV, Message> updateUdf =
-                               new VertexUpdateUdfSimpleVV<K, VV, 
Message>(updateFunction, vertexTypes);
+               GatherUdf<K, VV, Message> updateUdf =
+                               new GatherUdfSimpleVV<K, VV, 
Message>(gatherFunction, vertexTypes);
 
                // build the update function (co group)
                CoGroupOperator<?, ?, Vertex<K, VV>> updates =
@@ -624,7 +620,7 @@ public class ScatterGatherIteration<K, VV, Message, EV>
 
                DataSet<Tuple2<K, Message>> messages;
 
-               
this.updateFunction.setOptDegrees(this.configuration.isOptDegrees());
+               
this.gatherFunction.setOptDegrees(this.configuration.isOptDegrees());
 
                DataSet<Tuple2<K, LongValue>> inDegrees = graph.inDegrees();
                DataSet<Tuple2<K, LongValue>> outDegrees = graph.outDegrees();
@@ -634,7 +630,7 @@ public class ScatterGatherIteration<K, VV, Message, EV>
 
                                        @Override
                                        public void join(Tuple2<K, LongValue> 
first, Tuple2<K, LongValue> second, Collector<Tuple3<K, LongValue, LongValue>> 
out) {
-                                               out.collect(new Tuple3<K, 
LongValue, LongValue>(first.f0, first.f1, second.f1));
+                                               out.collect(new 
Tuple3<>(first.f0, first.f1, second.f1));
                                        }
                                
}).withForwardedFieldsFirst("f0;f1").withForwardedFieldsSecond("f1");
 
@@ -644,9 +640,8 @@ public class ScatterGatherIteration<K, VV, Message, EV>
                                        @Override
                                        public void join(Vertex<K, VV> vertex, 
Tuple3<K, LongValue, LongValue> degrees,
                                                                        
Collector<Vertex<K, Tuple3<VV, LongValue, LongValue>>> out) throws Exception {
-
-                                               out.collect(new Vertex<K, 
Tuple3<VV, LongValue, LongValue>>(vertex.getId(),
-                                                               new Tuple3<VV, 
LongValue, LongValue>(vertex.getValue(), degrees.f1, degrees.f2)));
+                                               out.collect(new 
Vertex<>(vertex.getId(),
+                                                                       new 
Tuple3<>(vertex.getValue(), degrees.f1, degrees.f2)));
                                        }
                                }).withForwardedFieldsFirst("f0");
 
@@ -659,22 +654,22 @@ public class ScatterGatherIteration<K, VV, Message, EV>
 
                switch (messagingDirection) {
                        case IN:
-                               messages = 
buildMessagingFunctionVerticesWithDegrees(iteration, messageTypeInfo, 1, 0, 
numberOfVertices);
+                               messages = 
buildScatterFunctionVerticesWithDegrees(iteration, messageTypeInfo, 1, 0, 
numberOfVertices);
                                break;
                        case OUT:
-                               messages = 
buildMessagingFunctionVerticesWithDegrees(iteration, messageTypeInfo, 0, 0, 
numberOfVertices);
+                               messages = 
buildScatterFunctionVerticesWithDegrees(iteration, messageTypeInfo, 0, 0, 
numberOfVertices);
                                break;
                        case ALL:
-                               messages = 
buildMessagingFunctionVerticesWithDegrees(iteration, messageTypeInfo, 1, 0, 
numberOfVertices)
-                                               
.union(buildMessagingFunctionVerticesWithDegrees(iteration, messageTypeInfo, 0, 
0, numberOfVertices)) ;
+                               messages = 
buildScatterFunctionVerticesWithDegrees(iteration, messageTypeInfo, 1, 0, 
numberOfVertices)
+                                               
.union(buildScatterFunctionVerticesWithDegrees(iteration, messageTypeInfo, 0, 
0, numberOfVertices)) ;
                                break;
                        default:
                                throw new IllegalArgumentException("Illegal 
edge direction");
                }
 
                @SuppressWarnings({ "unchecked", "rawtypes" })
-               VertexUpdateUdf<K, Tuple3<VV, LongValue, LongValue>, Message> 
updateUdf =
-                               new 
VertexUpdateUdfVVWithDegrees(updateFunction, vertexTypes);
+               GatherUdf<K, Tuple3<VV, LongValue, LongValue>, Message> 
updateUdf =
+                               new GatherUdfVVWithDegrees(gatherFunction, 
vertexTypes);
 
                // build the update function (co group)
                CoGroupOperator<?, ?, Vertex<K, Tuple3<VV, LongValue, 
LongValue>>> updates =
@@ -690,7 +685,7 @@ public class ScatterGatherIteration<K, VV, Message, EV>
                                new MapFunction<Vertex<K, Tuple3<VV, LongValue, 
LongValue>>, Vertex<K, VV>>() {
 
                                        public Vertex<K, VV> map(Vertex<K, 
Tuple3<VV, LongValue, LongValue>> vertex) {
-                                               return new Vertex<K, 
VV>(vertex.getId(), vertex.getValue().f0);
+                                               return new 
Vertex<>(vertex.getId(), vertex.getValue().f0);
                                        }
                                });
        }
@@ -700,7 +695,7 @@ public class ScatterGatherIteration<K, VV, Message, EV>
                // configure coGroup update function with name and broadcast 
variables
                updates = updates.name("Vertex State Updates");
                if (this.configuration != null) {
-                       for (Tuple2<String, DataSet<?>> e : 
this.configuration.getUpdateBcastVars()) {
+                       for (Tuple2<String, DataSet<?>> e : 
this.configuration.getGatherBcastVars()) {
                                updates = updates.withBroadcastSet(e.f1, e.f0);
                        }
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/918e5d0c/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexUpdateFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexUpdateFunction.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexUpdateFunction.java
deleted file mode 100644
index 9085432..0000000
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexUpdateFunction.java
+++ /dev/null
@@ -1,251 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.spargel;
-
-import java.io.Serializable;
-import java.util.Collection;
-
-import org.apache.flink.api.common.aggregators.Aggregator;
-import org.apache.flink.api.common.functions.IterationRuntimeContext;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.graph.Vertex;
-import org.apache.flink.types.Value;
-import org.apache.flink.util.Collector;
-
-/**
- * This class must be extended by functions that compute the state of the 
vertex depending on the old state and the
- * incoming messages. The central method is {@link #updateVertex(Vertex, 
MessageIterator)}, which is
- * invoked once per vertex per superstep.
- * 
- * {@code <K>} The vertex key type.
- * {@code <VV>} The vertex value type.
- * {@code <Message>} The message type.
- */
-public abstract class VertexUpdateFunction<K, VV, Message> implements 
Serializable {
-
-       private static final long serialVersionUID = 1L;
-
-       // 
--------------------------------------------------------------------------------------------
-       //  Attributes that allow vertices to access their in/out degrees and 
the total number of vertices
-       //  inside an iteration.
-       // 
--------------------------------------------------------------------------------------------
-
-       private long numberOfVertices = -1L;
-
-       /**
-        * Retrieves the number of vertices in the graph.
-        * @return the number of vertices if the {@link 
org.apache.flink.graph.IterationConfiguration#setOptNumVertices(boolean)}
-        * option has been set; -1 otherwise.
-        */
-       public long getNumberOfVertices() {
-               return numberOfVertices;
-       }
-
-       void setNumberOfVertices(long numberOfVertices) {
-               this.numberOfVertices = numberOfVertices;
-       }
-
-       
//---------------------------------------------------------------------------------------------
-
-       private boolean optDegrees;
-
-       boolean isOptDegrees() {
-               return optDegrees;
-       }
-
-       void setOptDegrees(boolean optDegrees) {
-               this.optDegrees = optDegrees;
-       }
-
-       // 
--------------------------------------------------------------------------------------------
-       //  Public API Methods
-       // 
--------------------------------------------------------------------------------------------
-       
-       /**
-        * This method is invoked once per vertex per superstep. It receives 
the current state of the vertex, as well as
-        * the incoming messages. It may set a new vertex state via {@link 
#setNewVertexValue(Object)}. If the vertex
-        * state is changed, it will trigger the sending of messages via the 
{@link MessagingFunction}.
-        * 
-        * @param vertex The vertex.
-        * @param inMessages The incoming messages to this vertex.
-        * 
-        * @throws Exception The computation may throw exceptions, which causes 
the superstep to fail.
-        */
-       public abstract void updateVertex(Vertex<K, VV> vertex, 
MessageIterator<Message> inMessages) throws Exception;
-       
-       /**
-        * This method is executed one per superstep before the vertex update 
function is invoked for each vertex.
-        * 
-        * @throws Exception Exceptions in the pre-superstep phase cause the 
superstep to fail.
-        */
-       public void preSuperstep() throws Exception {}
-       
-       /**
-        * This method is executed one per superstep after the vertex update 
function has been invoked for each vertex.
-        * 
-        * @throws Exception Exceptions in the post-superstep phase cause the 
superstep to fail.
-        */
-       public void postSuperstep() throws Exception {}
-       
-       /**
-        * Sets the new value of this vertex. Setting a new value triggers the 
sending of outgoing messages from this vertex.
-        *
-        * This should be called at most once per updateVertex.
-        * 
-        * @param newValue The new vertex value.
-        */
-       public void setNewVertexValue(VV newValue) {
-               if(setNewVertexValueCalled) {
-                       throw new IllegalStateException("setNewVertexValue 
should only be called at most once per updateVertex");
-               }
-               setNewVertexValueCalled = true;
-               if(isOptDegrees()) {
-                       outValWithDegrees.f1.f0 = newValue;
-                       outWithDegrees.collect(outValWithDegrees);
-               } else {
-                       outVal.setValue(newValue);
-                       out.collect(outVal);
-               }
-       }
-       
-       /**
-        * Gets the number of the superstep, starting at <tt>1</tt>.
-        * 
-        * @return The number of the current superstep.
-        */
-       public int getSuperstepNumber() {
-               return this.runtimeContext.getSuperstepNumber();
-       }
-       
-       /**
-        * Gets the iteration aggregator registered under the given name. The 
iteration aggregator combines
-        * all aggregates globally once per superstep and makes them available 
in the next superstep.
-        * 
-        * @param name The name of the aggregator.
-        * @return The aggregator registered under this name, or null, if no 
aggregator was registered.
-        */
-       public <T extends Aggregator<?>> T getIterationAggregator(String name) {
-               return this.runtimeContext.<T>getIterationAggregator(name);
-       }
-       
-       /**
-        * Get the aggregated value that an aggregator computed in the previous 
iteration.
-        * 
-        * @param name The name of the aggregator.
-        * @return The aggregated value of the previous iteration.
-        */
-       public <T extends Value> T getPreviousIterationAggregate(String name) {
-               return 
this.runtimeContext.<T>getPreviousIterationAggregate(name);
-       }
-       
-       /**
-        * Gets the broadcast data set registered under the given name. 
Broadcast data sets
-        * are available on all parallel instances of a function. They can be 
registered via
-        * {@link 
org.apache.flink.graph.spargel.ScatterGatherConfiguration#addBroadcastSetForUpdateFunction(String,
 org.apache.flink.api.java.DataSet)}.
-        * 
-        * @param name The name under which the broadcast set is registered.
-        * @return The broadcast data set.
-        */
-       public <T> Collection<T> getBroadcastSet(String name) {
-               return this.runtimeContext.<T>getBroadcastVariable(name);
-       }
-       
-       // 
--------------------------------------------------------------------------------------------
-       //  internal methods
-       // 
--------------------------------------------------------------------------------------------
-       
-       private IterationRuntimeContext runtimeContext;
-
-       private Collector<Vertex<K, VV>> out;
-       
-       private Collector<Vertex<K, Tuple3<VV, Long, Long>>> outWithDegrees;
-
-       private Vertex<K, VV> outVal;
-
-       private Vertex<K, Tuple3<VV, Long, Long>> outValWithDegrees;
-
-       private long inDegree = -1;
-
-       private long outDegree = -1;
-
-       private boolean setNewVertexValueCalled;
-
-       void init(IterationRuntimeContext context) {
-               this.runtimeContext = context;
-       }
-
-       void setOutput(Vertex<K, VV> outVal, Collector<Vertex<K, VV>> out) {
-               this.outVal = outVal;
-               this.out = out;
-               setNewVertexValueCalled = false;
-       }
-
-       @SuppressWarnings({ "unchecked", "rawtypes" })
-       <ValueWithDegree> void setOutputWithDegrees(Vertex<K, ValueWithDegree> 
outVal,
-                       Collector out) {
-               this.outValWithDegrees = (Vertex<K, Tuple3<VV, Long, Long>>) 
outVal;
-               this.outWithDegrees = out;
-               setNewVertexValueCalled = false;
-       }
-
-       /**
-        * Retrieves the vertex in-degree (number of in-coming edges).
-        * @return The in-degree of this vertex
-        */
-       public long getInDegree() {
-               return inDegree;
-       }
-
-       void setInDegree(long inDegree) {
-               this.inDegree = inDegree;
-       }
-
-       /**
-        * Retrieve the vertex out-degree (number of out-going edges).
-        * @return The out-degree of this vertex
-        */
-       public long getOutDegree() {
-               return outDegree;
-       }
-
-       void setOutDegree(long outDegree) {
-               this.outDegree = outDegree;
-       }
-
-       /**
-        * In order to hide the Tuple3(actualValue, inDegree, OutDegree) vertex 
value from the user,
-        * another function will be called from {@link 
org.apache.flink.graph.spargel.ScatterGatherIteration}.
-        *
-        * This function will retrieve the vertex from the vertexState and will 
set its degrees, afterwards calling
-        * the regular updateVertex function.
-        *
-        * @param vertexState
-        * @param inMessages
-        * @throws Exception
-        */
-       @SuppressWarnings("unchecked")
-       <VertexWithDegree> void 
updateVertexFromScatterGatherIteration(Vertex<K, VertexWithDegree> vertexState,
-                                                                               
                MessageIterator<Message> inMessages) throws Exception {
-
-               Vertex<K, VV> vertex = new Vertex<K, VV>(vertexState.f0,
-                               ((Tuple3<VV, Long, 
Long>)vertexState.getValue()).f0);
-
-               updateVertex(vertex, inMessages);
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/918e5d0c/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/spargel/SpargelCompilerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/spargel/SpargelCompilerTest.java
 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/spargel/SpargelCompilerTest.java
index 3a750af..14c2fb4 100644
--- 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/spargel/SpargelCompilerTest.java
+++ 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/spargel/SpargelCompilerTest.java
@@ -75,8 +75,8 @@ public class SpargelCompilerTest extends CompilerTestBase {
                                Graph<Long, Long, NullValue> graph = 
Graph.fromDataSet(initialVertices, edges, env);
 
                                DataSet<Vertex<Long, Long>> result = 
graph.runScatterGatherIteration(
-                                               new 
ConnectedComponents.CCUpdater<Long, Long>(),
-                                               new 
ConnectedComponents.CCMessenger<Long, Long>(BasicTypeInfo.LONG_TYPE_INFO), 100)
+                                               new 
ConnectedComponents.CCMessenger<Long, Long>(BasicTypeInfo.LONG_TYPE_INFO),
+                                               new 
ConnectedComponents.CCUpdater<Long, Long>(), 100)
                                                .getVertices();
                                
                                result.output(new 
DiscardingOutputFormat<Vertex<Long, Long>>());
@@ -157,12 +157,12 @@ public class SpargelCompilerTest extends CompilerTestBase 
{
                                Graph<Long, Long, NullValue> graph = 
Graph.fromDataSet(initialVertices, edges, env);
 
                                ScatterGatherConfiguration parameters = new 
ScatterGatherConfiguration();
-                               
parameters.addBroadcastSetForMessagingFunction(BC_VAR_NAME, bcVar);
-                               
parameters.addBroadcastSetForUpdateFunction(BC_VAR_NAME, bcVar);
+                               
parameters.addBroadcastSetForScatterFunction(BC_VAR_NAME, bcVar);
+                               
parameters.addBroadcastSetForGatherFunction(BC_VAR_NAME, bcVar);
 
                                DataSet<Vertex<Long, Long>> result = 
graph.runScatterGatherIteration(
-                                               new 
ConnectedComponents.CCUpdater<Long, Long>(),
-                                               new 
ConnectedComponents.CCMessenger<Long, Long>(BasicTypeInfo.LONG_TYPE_INFO), 100)
+                                               new 
ConnectedComponents.CCMessenger<Long, Long>(BasicTypeInfo.LONG_TYPE_INFO),
+                                               new 
ConnectedComponents.CCUpdater<Long, Long>(), 100)
                                                .getVertices();
                                        
                                result.output(new 
DiscardingOutputFormat<Vertex<Long, Long>>());

Reply via email to