Repository: flink
Updated Branches:
  refs/heads/master 6c6b17b4d -> 7324b9c17


http://git-wip-us.apache.org/repos/asf/flink/blob/918e5d0c/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/spargel/SpargelTranslationTest.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/spargel/SpargelTranslationTest.java
 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/spargel/SpargelTranslationTest.java
index 7755708..2c7e093 100644
--- 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/spargel/SpargelTranslationTest.java
+++ 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/spargel/SpargelTranslationTest.java
@@ -16,28 +16,27 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.graph.spargel;
 
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import org.junit.Test;
 import org.apache.flink.api.common.aggregators.LongSumAggregator;
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.io.DiscardingOutputFormat;
 import org.apache.flink.api.java.operators.DeltaIteration;
 import org.apache.flink.api.java.operators.DeltaIterationResultSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.operators.TwoInputUdfOperator;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.Vertex;
 import org.apache.flink.types.NullValue;
+import org.junit.Test;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 @SuppressWarnings("serial")
 public class SpargelTranslationTest {
@@ -46,28 +45,28 @@ public class SpargelTranslationTest {
        public void testTranslationPlainEdges() {
                try {
                        final String ITERATION_NAME = "Test Name";
-                       
+
                        final String AGGREGATOR_NAME = "AggregatorName";
-                       
+
                        final String BC_SET_MESSAGES_NAME = "borat messages";
-                       
+
                        final String BC_SET_UPDATES_NAME = "borat updates";
 
                        final int NUM_ITERATIONS = 13;
-                       
+
                        final int ITERATION_parallelism = 77;
-                       
-                       
+
+
                        ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-                       
+
                        DataSet<Long> bcMessaging = env.fromElements(1L);
                        DataSet<Long> bcUpdate = env.fromElements(1L);
-                       
+
                        DataSet<Vertex<String, Double>> result;
-                       
+
                        // ------------ construct the test program 
------------------
                        {
-                               
+
                                DataSet<Tuple2<String, Double>> initialVertices 
= env.fromElements(new Tuple2<>("abc", 3.44));
 
                                DataSet<Tuple2<String, String>> edges = 
env.fromElements(new Tuple2<>("a", "c"));
@@ -83,41 +82,41 @@ public class SpargelTranslationTest {
 
                                ScatterGatherConfiguration parameters = new 
ScatterGatherConfiguration();
 
-                               
parameters.addBroadcastSetForMessagingFunction(BC_SET_MESSAGES_NAME, 
bcMessaging);
-                               
parameters.addBroadcastSetForUpdateFunction(BC_SET_UPDATES_NAME, bcUpdate);
+                               
parameters.addBroadcastSetForScatterFunction(BC_SET_MESSAGES_NAME, bcMessaging);
+                               
parameters.addBroadcastSetForGatherFunction(BC_SET_UPDATES_NAME, bcUpdate);
                                parameters.setName(ITERATION_NAME);
                                
parameters.setParallelism(ITERATION_parallelism);
                                parameters.registerAggregator(AGGREGATOR_NAME, 
new LongSumAggregator());
 
-                               result = graph.runScatterGatherIteration(new 
UpdateFunction(), new MessageFunctionNoEdgeValue(),
+                               result = graph.runScatterGatherIteration(new 
MessageFunctionNoEdgeValue(), new UpdateFunction(),
                                                NUM_ITERATIONS, 
parameters).getVertices();
 
                                result.output(new 
DiscardingOutputFormat<Vertex<String, Double>>());
                        }
-                       
-                       
+
+
                        // ------------- validate the java program 
----------------
-                       
+
                        assertTrue(result instanceof DeltaIterationResultSet);
-                       
+
                        DeltaIterationResultSet<?, ?> resultSet = 
(DeltaIterationResultSet<?, ?>) result;
                        DeltaIteration<?, ?> iteration = 
resultSet.getIterationHead();
-                       
+
                        // check the basic iteration properties
                        assertEquals(NUM_ITERATIONS, 
resultSet.getMaxIterations());
                        assertArrayEquals(new int[] {0}, 
resultSet.getKeyPositions());
                        assertEquals(ITERATION_parallelism, 
iteration.getParallelism());
                        assertEquals(ITERATION_NAME, iteration.getName());
-                       
+
                        assertEquals(AGGREGATOR_NAME, 
iteration.getAggregators().getAllRegisteredAggregators().iterator().next().getName());
-                       
+
                        // validate that the semantic properties are set as 
they should
                        TwoInputUdfOperator<?, ?, ?, ?> solutionSetJoin = 
(TwoInputUdfOperator<?, ?, ?, ?>) resultSet.getNextWorkset();
                        
assertTrue(solutionSetJoin.getSemanticProperties().getForwardingTargetFields(0, 
0).contains(0));
                        
assertTrue(solutionSetJoin.getSemanticProperties().getForwardingTargetFields(1, 
0).contains(0));
-                       
+
                        TwoInputUdfOperator<?, ?, ?, ?> edgesJoin = 
(TwoInputUdfOperator<?, ?, ?, ?>) solutionSetJoin.getInput1();
-                       
+
                        // validate that the broadcast sets are forwarded
                        assertEquals(bcUpdate, 
solutionSetJoin.getBroadcastSets().get(BC_SET_UPDATES_NAME));
                        assertEquals(bcMessaging, 
edgesJoin.getBroadcastSets().get(BC_SET_MESSAGES_NAME));
@@ -128,29 +127,29 @@ public class SpargelTranslationTest {
                        fail(e.getMessage());
                }
        }
-       
+
        @Test
        public void testTranslationPlainEdgesWithForkedBroadcastVariable() {
                try {
                        final String ITERATION_NAME = "Test Name";
-                       
+
                        final String AGGREGATOR_NAME = "AggregatorName";
-                       
+
                        final String BC_SET_MESSAGES_NAME = "borat messages";
-                       
+
                        final String BC_SET_UPDATES_NAME = "borat updates";
 
                        final int NUM_ITERATIONS = 13;
-                       
+
                        final int ITERATION_parallelism = 77;
-                       
-                       
+
+
                        ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-                       
+
                        DataSet<Long> bcVar = env.fromElements(1L);
-                       
+
                        DataSet<Vertex<String, Double>> result;
-                       
+
                        // ------------ construct the test program 
------------------
                        {
 
@@ -169,41 +168,41 @@ public class SpargelTranslationTest {
 
                                ScatterGatherConfiguration parameters = new 
ScatterGatherConfiguration();
 
-                               
parameters.addBroadcastSetForMessagingFunction(BC_SET_MESSAGES_NAME, bcVar);
-                               
parameters.addBroadcastSetForUpdateFunction(BC_SET_UPDATES_NAME, bcVar);
+                               
parameters.addBroadcastSetForScatterFunction(BC_SET_MESSAGES_NAME, bcVar);
+                               
parameters.addBroadcastSetForGatherFunction(BC_SET_UPDATES_NAME, bcVar);
                                parameters.setName(ITERATION_NAME);
                                
parameters.setParallelism(ITERATION_parallelism);
                                parameters.registerAggregator(AGGREGATOR_NAME, 
new LongSumAggregator());
-                               
-                               result = graph.runScatterGatherIteration(new 
UpdateFunction(), new MessageFunctionNoEdgeValue(),
+
+                               result = graph.runScatterGatherIteration(new 
MessageFunctionNoEdgeValue(), new UpdateFunction(),
                                                NUM_ITERATIONS, 
parameters).getVertices();
 
                                result.output(new 
DiscardingOutputFormat<Vertex<String, Double>>());
                        }
-                       
-                       
+
+
                        // ------------- validate the java program 
----------------
-                       
+
                        assertTrue(result instanceof DeltaIterationResultSet);
-                       
+
                        DeltaIterationResultSet<?, ?> resultSet = 
(DeltaIterationResultSet<?, ?>) result;
                        DeltaIteration<?, ?> iteration = 
resultSet.getIterationHead();
-                       
+
                        // check the basic iteration properties
                        assertEquals(NUM_ITERATIONS, 
resultSet.getMaxIterations());
                        assertArrayEquals(new int[] {0}, 
resultSet.getKeyPositions());
                        assertEquals(ITERATION_parallelism, 
iteration.getParallelism());
                        assertEquals(ITERATION_NAME, iteration.getName());
-                       
+
                        assertEquals(AGGREGATOR_NAME, 
iteration.getAggregators().getAllRegisteredAggregators().iterator().next().getName());
-                       
+
                        // validate that the semantic properties are set as 
they should
                        TwoInputUdfOperator<?, ?, ?, ?> solutionSetJoin = 
(TwoInputUdfOperator<?, ?, ?, ?>) resultSet.getNextWorkset();
                        
assertTrue(solutionSetJoin.getSemanticProperties().getForwardingTargetFields(0, 
0).contains(0));
                        
assertTrue(solutionSetJoin.getSemanticProperties().getForwardingTargetFields(1, 
0).contains(0));
-                       
+
                        TwoInputUdfOperator<?, ?, ?, ?> edgesJoin = 
(TwoInputUdfOperator<?, ?, ?, ?>) solutionSetJoin.getInput1();
-                       
+
                        // validate that the broadcast sets are forwarded
                        assertEquals(bcVar, 
solutionSetJoin.getBroadcastSets().get(BC_SET_UPDATES_NAME));
                        assertEquals(bcVar, 
edgesJoin.getBroadcastSets().get(BC_SET_MESSAGES_NAME));
@@ -214,18 +213,18 @@ public class SpargelTranslationTest {
                        fail(e.getMessage());
                }
        }
-       
+
        // 
--------------------------------------------------------------------------------------------
-       
-       public static class UpdateFunction extends VertexUpdateFunction<String, 
Double, Long> {
+
+       private static class MessageFunctionNoEdgeValue extends 
ScatterFunction<String, Double, Long, NullValue> {
 
                @Override
-               public void updateVertex(Vertex<String, Double> vertex, 
MessageIterator<Long> inMessages) {}
+               public void sendMessages(Vertex<String, Double> vertex) {}
        }
-       
-       public static class MessageFunctionNoEdgeValue extends 
MessagingFunction<String, Double, Long, NullValue> {
+
+       private static class UpdateFunction extends GatherFunction<String, 
Double, Long> {
 
                @Override
-               public void sendMessages(Vertex<String, Double> vertex) {}
+               public void updateVertex(Vertex<String, Double> vertex, 
MessageIterator<Long> inMessages) {}
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/918e5d0c/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/CollectionModeSuperstepITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/CollectionModeSuperstepITCase.java
 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/CollectionModeSuperstepITCase.java
index b1c2a2c..cb7573c 100644
--- 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/CollectionModeSuperstepITCase.java
+++ 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/CollectionModeSuperstepITCase.java
@@ -24,9 +24,9 @@ import org.apache.flink.api.java.io.DiscardingOutputFormat;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.spargel.GatherFunction;
 import org.apache.flink.graph.spargel.MessageIterator;
-import org.apache.flink.graph.spargel.MessagingFunction;
-import org.apache.flink.graph.spargel.VertexUpdateFunction;
+import org.apache.flink.graph.spargel.ScatterFunction;
 import org.apache.flink.graph.utils.VertexToTuple2Map;
 import org.junit.Assert;
 import org.junit.Test;
@@ -36,47 +36,46 @@ public class CollectionModeSuperstepITCase {
 
        /**
         * Dummy iteration to test that the supersteps are correctly incremented
-        * and can be retrieved from inside the updated and messaging functions.
+        * and can be retrieved from inside the scatter and gather functions.
         * All vertices start with value 1 and increase their value by 1
         * in each iteration. 
         */
        @Test
        public void testProgram() throws Exception {
                ExecutionEnvironment env = 
ExecutionEnvironment.createCollectionsEnvironment();
-               
-               Graph<Long, Long, Long> graph = 
Graph.fromCollection(TestGraphUtils.getLongLongVertices(), 
+
+               Graph<Long, Long, Long> graph = 
Graph.fromCollection(TestGraphUtils.getLongLongVertices(),
                                TestGraphUtils.getLongLongEdges(), 
env).mapVertices(new AssignOneMapper());
-               
+
                Graph<Long, Long, Long> result = 
graph.runScatterGatherIteration(
-                               new UpdateFunction(), new MessageFunction(), 
10);
+                               new MessageFunction(), new UpdateFunction(), 
10);
 
                result.getVertices().map(
                                new VertexToTuple2Map<Long, Long>()).output(
                                                new 
DiscardingOutputFormat<Tuple2<Long, Long>>());
                env.execute();
        }
-       
-       public static final class UpdateFunction extends 
VertexUpdateFunction<Long, Long, Long> {
+
+       private static final class MessageFunction extends 
ScatterFunction<Long, Long, Long, Long> {
                @Override
-               public void updateVertex(Vertex<Long, Long> vertex, 
MessageIterator<Long> inMessages) {
+               public void sendMessages(Vertex<Long, Long> vertex) {
                        long superstep = getSuperstepNumber();
                        Assert.assertEquals(true, vertex.getValue() == 
superstep);
-                       setNewVertexValue(vertex.getValue() + 1);
+                       //send message to keep vertices active
+                       sendMessageToAllNeighbors(vertex.getValue());
                }
        }
-       
-       public static final class MessageFunction extends 
MessagingFunction<Long, Long, Long, Long> {
+
+       private static final class UpdateFunction extends GatherFunction<Long, 
Long, Long> {
                @Override
-               public void sendMessages(Vertex<Long, Long> vertex) {
+               public void updateVertex(Vertex<Long, Long> vertex, 
MessageIterator<Long> inMessages) {
                        long superstep = getSuperstepNumber();
                        Assert.assertEquals(true, vertex.getValue() == 
superstep);
-                       //send message to keep vertices active
-                       sendMessageToAllNeighbors(vertex.getValue());
+                       setNewVertexValue(vertex.getValue() + 1);
                }
        }
 
-       public static final class AssignOneMapper implements 
MapFunction<Vertex<Long, Long>, Long> {
-
+       private static final class AssignOneMapper implements 
MapFunction<Vertex<Long, Long>, Long> {
                public Long map(Vertex<Long, Long> value) {
                        return 1L;
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/918e5d0c/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/ScatterGatherConfigurationITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/ScatterGatherConfigurationITCase.java
 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/ScatterGatherConfigurationITCase.java
index f14e002..fcd0d82 100644
--- 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/ScatterGatherConfigurationITCase.java
+++ 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/ScatterGatherConfigurationITCase.java
@@ -18,9 +18,6 @@
 
 package org.apache.flink.graph.test;
 
-import java.util.HashSet;
-import java.util.List;
-
 import org.apache.flink.api.common.aggregators.LongSumAggregator;
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.java.DataSet;
@@ -31,18 +28,21 @@ import org.apache.flink.graph.Edge;
 import org.apache.flink.graph.EdgeDirection;
 import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.spargel.GatherFunction;
 import org.apache.flink.graph.spargel.MessageIterator;
-import org.apache.flink.graph.spargel.MessagingFunction;
+import org.apache.flink.graph.spargel.ScatterFunction;
 import org.apache.flink.graph.spargel.ScatterGatherConfiguration;
 import org.apache.flink.graph.spargel.ScatterGatherIteration;
-import org.apache.flink.graph.spargel.VertexUpdateFunction;
+import org.apache.flink.graph.utils.VertexToTuple2Map;
 import org.apache.flink.test.util.MultipleProgramsTestBase;
 import org.apache.flink.types.LongValue;
 import org.junit.Assert;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
-import org.apache.flink.graph.utils.VertexToTuple2Map;
+
+import java.util.HashSet;
+import java.util.List;
 
 @RunWith(Parameterized.class)
 public class ScatterGatherConfigurationITCase extends MultipleProgramsTestBase 
{
@@ -59,30 +59,30 @@ public class ScatterGatherConfigurationITCase extends 
MultipleProgramsTestBase {
                 * Test Graph's runScatterGatherIteration when configuration 
parameters are provided
                 */
                final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-               
-               Graph<Long, Long, Long> graph = 
Graph.fromCollection(TestGraphUtils.getLongLongVertices(), 
+
+               Graph<Long, Long, Long> graph = 
Graph.fromCollection(TestGraphUtils.getLongLongVertices(),
                                TestGraphUtils.getLongLongEdges(), 
env).mapVertices(new AssignOneMapper());
 
                // create the configuration object
                ScatterGatherConfiguration parameters = new 
ScatterGatherConfiguration();
 
-               parameters.addBroadcastSetForUpdateFunction("updateBcastSet", 
env.fromElements(1, 2, 3));
-               
parameters.addBroadcastSetForMessagingFunction("messagingBcastSet", 
env.fromElements(4, 5, 6));
+               
parameters.addBroadcastSetForScatterFunction("messagingBcastSet", 
env.fromElements(4, 5, 6));
+               parameters.addBroadcastSetForGatherFunction("updateBcastSet", 
env.fromElements(1, 2, 3));
                parameters.registerAggregator("superstepAggregator", new 
LongSumAggregator());
                parameters.setOptNumVertices(true);
 
                Graph<Long, Long, Long> res = graph.runScatterGatherIteration(
-                               new UpdateFunction(), new MessageFunction(), 
10, parameters);
+                               new MessageFunction(), new UpdateFunction(), 
10, parameters);
 
                DataSet<Vertex<Long,Long>> data = res.getVertices();
                List<Vertex<Long,Long>> result= data.collect();
-        
+
                expectedResult = "1,11\n" +
                                                "2,11\n" +
                                                "3,11\n" +
                                                "4,11\n" +
                                                "5,11";
-               
+
                compareResultAsTuples(result, expectedResult);
        }
 
@@ -95,29 +95,29 @@ public class ScatterGatherConfigurationITCase extends 
MultipleProgramsTestBase {
                final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
 
                ScatterGatherIteration<Long, Long, Long, Long> iteration = 
ScatterGatherIteration
-                               
.withEdges(TestGraphUtils.getLongLongEdgeData(env), new DummyUpdateFunction(), 
-                                               new DummyMessageFunction(), 10);
-               
+                               
.withEdges(TestGraphUtils.getLongLongEdgeData(env), new DummyMessageFunction(),
+                                               new DummyUpdateFunction(), 10);
+
                ScatterGatherConfiguration parameters = new 
ScatterGatherConfiguration();
                parameters.setName("gelly iteration");
                parameters.setParallelism(2);
                parameters.setSolutionSetUnmanagedMemory(true);
-               
+
                iteration.configure(parameters);
-               
+
                Assert.assertEquals("gelly iteration", 
iteration.getIterationConfiguration().getName(""));
                Assert.assertEquals(2, 
iteration.getIterationConfiguration().getParallelism());
                Assert.assertEquals(true, 
iteration.getIterationConfiguration().isSolutionSetUnmanagedMemory());
 
                DataSet<Vertex<Long, Long>> data = 
TestGraphUtils.getLongLongVertexData(env).runOperation(iteration);
         List<Vertex<Long,Long>> result= data.collect();
-        
+
                expectedResult = "1,11\n" +
                                                "2,12\n" +
                                                "3,13\n" +
                                                "4,14\n" +
                                                "5,15";
-               
+
                compareResultAsTuples(result, expectedResult);
        }
 
@@ -128,23 +128,23 @@ public class ScatterGatherConfigurationITCase extends 
MultipleProgramsTestBase {
                 * i.e. degrees and numVertices will be -1, EdgeDirection will 
be OUT.
                 */
                final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-               
-               Graph<Long, Long, Long> graph = 
Graph.fromCollection(TestGraphUtils.getLongLongVertices(), 
+
+               Graph<Long, Long, Long> graph = 
Graph.fromCollection(TestGraphUtils.getLongLongVertices(),
                                TestGraphUtils.getLongLongEdges(), 
env).mapVertices(new AssignOneMapper());
 
                Graph<Long, Long, Long> res = graph.runScatterGatherIteration(
-                               new UpdateFunctionDefault(), new 
MessageFunctionDefault(), 5);
+                               new MessageFunctionDefault(), new 
UpdateFunctionDefault(), 5);
+
 
-               
                DataSet<Tuple2<Long, Long>> data = res.getVertices().map(new 
VertexToTuple2Map<Long, Long>());
                List<Tuple2<Long, Long>> result= data.collect();
-        
+
                expectedResult = "1,6\n" +
                                                "2,6\n" +
                                                "3,6\n" +
                                                "4,6\n" +
                                                "5,6";
-               
+
                compareResultAsTuples(result, expectedResult);
        }
 
@@ -162,7 +162,7 @@ public class ScatterGatherConfigurationITCase extends 
MultipleProgramsTestBase {
                                .mapVertices(new InitialiseHashSetMapper());
 
                DataSet<Vertex<Long, HashSet<Long>>> resultedVertices = graph
-                               .runScatterGatherIteration(new 
VertexUpdateDirection(), new IdMessengerTrg(), 5)
+                               .runScatterGatherIteration(new 
IdMessengerTrg(), new VertexUpdateDirection(), 5)
                                .getVertices();
 
         List<Vertex<Long, HashSet<Long>>> result= resultedVertices.collect();
@@ -172,7 +172,7 @@ public class ScatterGatherConfigurationITCase extends 
MultipleProgramsTestBase {
                                "3,[1, 2]\n" +
                                "4,[3]\n" +
                                "5,[3, 4]";
-               
+
                compareResultAsTuples(result, expectedResult);
        }
 
@@ -195,7 +195,7 @@ public class ScatterGatherConfigurationITCase extends 
MultipleProgramsTestBase {
                parameters.setDirection(EdgeDirection.IN);
 
                DataSet<Vertex<Long, HashSet<Long>>> resultedVertices = graph
-                               .runScatterGatherIteration(new 
VertexUpdateDirection(), new IdMessengerSrc(), 5, parameters)
+                               .runScatterGatherIteration(new 
IdMessengerSrc(), new VertexUpdateDirection(), 5, parameters)
                                .getVertices();
 
         List<Vertex<Long, HashSet<Long>>> result= resultedVertices.collect();
@@ -205,7 +205,7 @@ public class ScatterGatherConfigurationITCase extends 
MultipleProgramsTestBase {
                                "3,[4, 5]\n" +
                                "4,[5]\n" +
                                "5,[1]";
-               
+
                compareResultAsTuples(result, expectedResult);
        }
 
@@ -228,7 +228,7 @@ public class ScatterGatherConfigurationITCase extends 
MultipleProgramsTestBase {
                parameters.setDirection(EdgeDirection.ALL);
 
                DataSet<Vertex<Long, HashSet<Long>>> resultedVertices = graph
-                               .runScatterGatherIteration(new 
VertexUpdateDirection(), new IdMessengerAll(), 5, parameters)
+                               .runScatterGatherIteration(new 
IdMessengerAll(), new VertexUpdateDirection(), 5, parameters)
                                .getVertices();
 
                List<Vertex<Long, HashSet<Long>>> result= 
resultedVertices.collect();
@@ -238,7 +238,7 @@ public class ScatterGatherConfigurationITCase extends 
MultipleProgramsTestBase {
                                "3,[1, 2, 4, 5]\n" +
                                "4,[3, 5]\n" +
                                "5,[1, 3, 4]";
-               
+
                compareResultAsTuples(result, expectedResult);
        }
 
@@ -261,7 +261,7 @@ public class ScatterGatherConfigurationITCase extends 
MultipleProgramsTestBase {
                parameters.setDirection(EdgeDirection.IN);
 
                DataSet<Vertex<Long, HashSet<Long>>> resultedVertices = graph
-                               .runScatterGatherIteration(new 
VertexUpdateDirection(), new SendMsgToAll(), 5, parameters)
+                               .runScatterGatherIteration(new SendMsgToAll(), 
new VertexUpdateDirection(), 5, parameters)
                                .getVertices();
 
                List<Vertex<Long, HashSet<Long>>> result = 
resultedVertices.collect();
@@ -271,7 +271,7 @@ public class ScatterGatherConfigurationITCase extends 
MultipleProgramsTestBase {
                                "3,[4, 5]\n" +
                                "4,[5]\n" +
                                "5,[1]";
-               
+
                compareResultAsTuples(result, expectedResult);
        }
 
@@ -294,7 +294,7 @@ public class ScatterGatherConfigurationITCase extends 
MultipleProgramsTestBase {
                parameters.setDirection(EdgeDirection.OUT);
 
                DataSet<Vertex<Long, HashSet<Long>>> resultedVertices = graph
-                               .runScatterGatherIteration(new 
VertexUpdateDirection(), new SendMsgToAll(), 5, parameters)
+                               .runScatterGatherIteration(new SendMsgToAll(), 
new VertexUpdateDirection(), 5, parameters)
                                .getVertices();
 
                List<Vertex<Long, HashSet<Long>>> result = 
resultedVertices.collect();
@@ -304,7 +304,7 @@ public class ScatterGatherConfigurationITCase extends 
MultipleProgramsTestBase {
                                "3,[1, 2]\n" +
                                "4,[3]\n" +
                                "5,[3, 4]";
-               
+
                compareResultAsTuples(result, expectedResult);
        }
 
@@ -327,7 +327,7 @@ public class ScatterGatherConfigurationITCase extends 
MultipleProgramsTestBase {
                parameters.setDirection(EdgeDirection.ALL);
 
                DataSet<Vertex<Long, HashSet<Long>>> resultedVertices = graph
-                               .runScatterGatherIteration(new 
VertexUpdateDirection(), new SendMsgToAll(), 5, parameters)
+                               .runScatterGatherIteration(new SendMsgToAll(), 
new VertexUpdateDirection(), 5, parameters)
                                .getVertices();
 
                List<Vertex<Long, HashSet<Long>>> result = 
resultedVertices.collect();
@@ -337,7 +337,7 @@ public class ScatterGatherConfigurationITCase extends 
MultipleProgramsTestBase {
                                "3,[1, 2, 4, 5]\n" +
                                "4,[3, 5]\n" +
                                "5,[1, 3, 4]";
-               
+
                compareResultAsTuples(result, expectedResult);
        }
 
@@ -353,8 +353,8 @@ public class ScatterGatherConfigurationITCase extends 
MultipleProgramsTestBase {
                Graph<Long, Long, Long> graph = 
Graph.fromCollection(TestGraphUtils.getLongLongVertices(),
                                TestGraphUtils.getLongLongEdges(), env);
 
-               DataSet<Vertex<Long, Long>> verticesWithNumVertices = 
graph.runScatterGatherIteration(new UpdateFunctionNumVertices(),
-                               new DummyMessageFunction(), 2).getVertices();
+               DataSet<Vertex<Long, Long>> verticesWithNumVertices = 
graph.runScatterGatherIteration(new DummyMessageFunction(),
+                               new UpdateFunctionNumVertices(), 
2).getVertices();
 
                List<Vertex<Long, Long>> result= 
verticesWithNumVertices.collect();
 
@@ -363,7 +363,7 @@ public class ScatterGatherConfigurationITCase extends 
MultipleProgramsTestBase {
                                "3,-1\n" +
                                "4,-1\n" +
                                "5,-1";
-               
+
                compareResultAsTuples(result, expectedResult);
        }
 
@@ -371,9 +371,9 @@ public class ScatterGatherConfigurationITCase extends 
MultipleProgramsTestBase {
        public void testInDegreesSet() throws Exception {
 
                /*
-                * Test that if the degrees are set, they can be accessed in 
every superstep 
+                * Test that if the degrees are set, they can be accessed in 
every superstep
                 * inside the update function and the value
-                * is correctly computed for degrees in the messaging function.
+                * is correctly computed for degrees in the scatter function.
                 */
                final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
 
@@ -386,7 +386,7 @@ public class ScatterGatherConfigurationITCase extends 
MultipleProgramsTestBase {
                parameters.setOptDegrees(true);
 
                DataSet<Vertex<Long, Long>> verticesWithDegrees = 
graph.runScatterGatherIteration(
-                               new UpdateFunctionInDegrees(), new 
DegreesMessageFunction(), 5, parameters).getVertices();
+                               new DegreesMessageFunction(), new 
UpdateFunctionInDegrees(), 5, parameters).getVertices();
 
                List<Vertex<Long, Long>> result= verticesWithDegrees.collect();
 
@@ -395,7 +395,7 @@ public class ScatterGatherConfigurationITCase extends 
MultipleProgramsTestBase {
                                "3,2\n" +
                                "4,1\n" +
                                "5,2";
-               
+
                compareResultAsTuples(result, expectedResult);
        }
 
@@ -411,7 +411,7 @@ public class ScatterGatherConfigurationITCase extends 
MultipleProgramsTestBase {
                                TestGraphUtils.getLongLongEdges(), env);
 
                DataSet<Vertex<Long, Long>> verticesWithDegrees = 
graph.runScatterGatherIteration(
-                               new UpdateFunctionInDegrees(), new 
DummyMessageFunction(), 2).getVertices();
+                               new DummyMessageFunction(), new 
UpdateFunctionInDegrees(), 2).getVertices();
 
                List<Vertex<Long, Long>> result= verticesWithDegrees.collect();
 
@@ -420,7 +420,7 @@ public class ScatterGatherConfigurationITCase extends 
MultipleProgramsTestBase {
                                "3,-1\n" +
                                "4,-1\n" +
                                "5,-1";
-               
+
                compareResultAsTuples(result, expectedResult);
        }
 
@@ -430,7 +430,7 @@ public class ScatterGatherConfigurationITCase extends 
MultipleProgramsTestBase {
                /*
                 * Test that if the degrees are set, they can be accessed in 
every superstep
                 * inside the update function and the value
-                * is correctly computed for degrees in the messaging function.
+                * is correctly computed for degrees in the scatter function.
                 */
                final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
 
@@ -443,7 +443,7 @@ public class ScatterGatherConfigurationITCase extends 
MultipleProgramsTestBase {
                parameters.setOptDegrees(true);
 
                DataSet<Vertex<Long, Long>> verticesWithDegrees = 
graph.runScatterGatherIteration(
-                               new UpdateFunctionOutDegrees(), new 
DegreesMessageFunction(), 5, parameters).getVertices();
+                               new DegreesMessageFunction(), new 
UpdateFunctionOutDegrees(), 5, parameters).getVertices();
 
                List<Vertex<Long, Long>> result= verticesWithDegrees.collect();
 
@@ -452,7 +452,7 @@ public class ScatterGatherConfigurationITCase extends 
MultipleProgramsTestBase {
                                "3,2\n" +
                                "4,1\n" +
                                "5,1";
-               
+
                compareResultAsTuples(result, expectedResult);
        }
 
@@ -468,7 +468,7 @@ public class ScatterGatherConfigurationITCase extends 
MultipleProgramsTestBase {
                                TestGraphUtils.getLongLongEdges(), env);
 
                DataSet<Vertex<Long, Long>> verticesWithDegrees = 
graph.runScatterGatherIteration(
-                               new UpdateFunctionInDegrees(), new 
DummyMessageFunction(), 2).getVertices();
+                               new DummyMessageFunction(), new 
UpdateFunctionInDegrees(), 2).getVertices();
 
                List<Vertex<Long, Long>> result= verticesWithDegrees.collect();
 
@@ -477,7 +477,7 @@ public class ScatterGatherConfigurationITCase extends 
MultipleProgramsTestBase {
                                "3,-1\n" +
                                "4,-1\n" +
                                "5,-1";
-               
+
                compareResultAsTuples(result, expectedResult);
        }
 
@@ -500,7 +500,7 @@ public class ScatterGatherConfigurationITCase extends 
MultipleProgramsTestBase {
                parameters.setDirection(EdgeDirection.ALL);
 
                DataSet<Vertex<Long, Boolean>> verticesWithNumNeighbors = 
graph.runScatterGatherIteration(
-                               new VertexUpdateNumNeighbors(), new 
IdMessenger(), 1, parameters).getVertices();
+                               new IdMessenger(), new 
VertexUpdateNumNeighbors(), 1, parameters).getVertices();
 
                List<Vertex<Long, Boolean>> result= 
verticesWithNumNeighbors.collect();
 
@@ -509,107 +509,107 @@ public class ScatterGatherConfigurationITCase extends 
MultipleProgramsTestBase {
                                "3,true\n" +
                                "4,true\n" +
                                "5,true";
-               
+
                compareResultAsTuples(result, expectedResult);
        }
 
        @SuppressWarnings("serial")
-       public static final class UpdateFunction extends 
VertexUpdateFunction<Long, Long, Long> {
-
-               LongSumAggregator aggregator = new LongSumAggregator();
+       private static final class MessageFunction extends 
ScatterFunction<Long, Long, Long, Long> {
 
                @Override
                public void preSuperstep() {
-                       
+
                        // test bcast variable
                        @SuppressWarnings("unchecked")
-                       List<Tuple1<Integer>> bcastSet = 
(List<Tuple1<Integer>>)(List<?>)getBroadcastSet("updateBcastSet");
-                       Assert.assertEquals(1, bcastSet.get(0));
-                       Assert.assertEquals(2, bcastSet.get(1));
-                       Assert.assertEquals(3, bcastSet.get(2));
-                       
-                       // test aggregator
-                       aggregator = 
getIterationAggregator("superstepAggregator");
+                       List<Tuple1<Integer>> bcastSet = 
(List<Tuple1<Integer>>)(List<?>)getBroadcastSet("messagingBcastSet");
+                       Assert.assertEquals(4, bcastSet.get(0));
+                       Assert.assertEquals(5, bcastSet.get(1));
+                       Assert.assertEquals(6, bcastSet.get(2));
 
                        // test number of vertices
                        Assert.assertEquals(5, getNumberOfVertices());
-                       
+
+                       // test aggregator
+                       if (getSuperstepNumber() == 2) {
+                               long aggrValue = 
((LongValue)getPreviousIterationAggregate("superstepAggregator")).getValue();
+                               Assert.assertEquals(5, aggrValue);
+                       }
                }
 
                @Override
-               public void updateVertex(Vertex<Long, Long> vertex, 
MessageIterator<Long> inMessages) {
-                       long superstep = getSuperstepNumber();
-                       aggregator.aggregate(superstep);
-
-                       setNewVertexValue(vertex.getValue() + 1);
+               public void sendMessages(Vertex<Long, Long> vertex) {
+                       //send message to keep vertices active
+                       sendMessageToAllNeighbors(vertex.getValue());
                }
        }
 
        @SuppressWarnings("serial")
-       public static final class UpdateFunctionDefault extends 
VertexUpdateFunction<Long, Long, Long> {
+       private static final class MessageFunctionDefault extends 
ScatterFunction<Long, Long, Long, Long> {
 
                @Override
-               public void updateVertex(Vertex<Long, Long> vertex, 
MessageIterator<Long> inMessages) {
-
+               public void sendMessages(Vertex<Long, Long> vertex) {
                        // test number of vertices
                        Assert.assertEquals(-1, getNumberOfVertices());
 
                        // test degrees
                        Assert.assertEquals(-1, getInDegree());
                        Assert.assertEquals(-1, getOutDegree());
-
-                       setNewVertexValue(vertex.getValue() + 1);
+                       //send message to keep vertices active
+                       sendMessageToAllNeighbors(vertex.getValue());
                }
        }
-       
+
        @SuppressWarnings("serial")
-       public static final class MessageFunction extends 
MessagingFunction<Long, Long, Long, Long> {
+       private static final class UpdateFunction extends GatherFunction<Long, 
Long, Long> {
+
+               LongSumAggregator aggregator = new LongSumAggregator();
 
                @Override
                public void preSuperstep() {
-                       
+
                        // test bcast variable
                        @SuppressWarnings("unchecked")
-                       List<Tuple1<Integer>> bcastSet = 
(List<Tuple1<Integer>>)(List<?>)getBroadcastSet("messagingBcastSet");
-                       Assert.assertEquals(4, bcastSet.get(0));
-                       Assert.assertEquals(5, bcastSet.get(1));
-                       Assert.assertEquals(6, bcastSet.get(2));
+                       List<Tuple1<Integer>> bcastSet = 
(List<Tuple1<Integer>>)(List<?>)getBroadcastSet("updateBcastSet");
+                       Assert.assertEquals(1, bcastSet.get(0));
+                       Assert.assertEquals(2, bcastSet.get(1));
+                       Assert.assertEquals(3, bcastSet.get(2));
+
+                       // test aggregator
+                       aggregator = 
getIterationAggregator("superstepAggregator");
 
                        // test number of vertices
                        Assert.assertEquals(5, getNumberOfVertices());
-                       
-                       // test aggregator
-                       if (getSuperstepNumber() == 2) {
-                               long aggrValue = 
((LongValue)getPreviousIterationAggregate("superstepAggregator")).getValue();
-                               Assert.assertEquals(5, aggrValue);
-                       }
+
                }
 
                @Override
-               public void sendMessages(Vertex<Long, Long> vertex) {
-                       //send message to keep vertices active
-                       sendMessageToAllNeighbors(vertex.getValue());
+               public void updateVertex(Vertex<Long, Long> vertex, 
MessageIterator<Long> inMessages) {
+                       long superstep = getSuperstepNumber();
+                       aggregator.aggregate(superstep);
+
+                       setNewVertexValue(vertex.getValue() + 1);
                }
        }
 
        @SuppressWarnings("serial")
-       public static final class MessageFunctionDefault extends 
MessagingFunction<Long, Long, Long, Long> {
+       private static final class UpdateFunctionDefault extends 
GatherFunction<Long, Long, Long> {
 
                @Override
-               public void sendMessages(Vertex<Long, Long> vertex) {
+               public void updateVertex(Vertex<Long, Long> vertex, 
MessageIterator<Long> inMessages) {
+
                        // test number of vertices
                        Assert.assertEquals(-1, getNumberOfVertices());
 
                        // test degrees
                        Assert.assertEquals(-1, getInDegree());
                        Assert.assertEquals(-1, getOutDegree());
-                       //send message to keep vertices active
-                       sendMessageToAllNeighbors(vertex.getValue());
+
+                       setNewVertexValue(vertex.getValue() + 1);
                }
        }
 
        @SuppressWarnings("serial")
-       public static final class UpdateFunctionNumVertices extends 
VertexUpdateFunction<Long, Long, Long> {
+       private static final class UpdateFunctionNumVertices extends 
GatherFunction<Long, Long, Long> {
 
                @Override
                public void updateVertex(Vertex<Long, Long> vertex, 
MessageIterator<Long> inMessages) {
@@ -618,17 +618,16 @@ public class ScatterGatherConfigurationITCase extends 
MultipleProgramsTestBase {
        }
 
        @SuppressWarnings("serial")
-       public static final class DummyUpdateFunction extends 
VertexUpdateFunction<Long, Long, Long> {
+       private static final class DummyUpdateFunction extends 
GatherFunction<Long, Long, Long> {
 
                @Override
                public void updateVertex(Vertex<Long, Long> vertex, 
MessageIterator<Long> inMessages) {
                        setNewVertexValue(vertex.getValue() + 1);
                }
        }
-       
-       @SuppressWarnings("serial")
-       public static final class DummyMessageFunction extends 
MessagingFunction<Long, Long, Long, Long> {
 
+       @SuppressWarnings("serial")
+       private static final class DummyMessageFunction extends 
ScatterFunction<Long, Long, Long, Long> {
                @Override
                public void sendMessages(Vertex<Long, Long> vertex) {
                        //send message to keep vertices active
@@ -637,7 +636,7 @@ public class ScatterGatherConfigurationITCase extends 
MultipleProgramsTestBase {
        }
 
        @SuppressWarnings("serial")
-       public static final class DegreesMessageFunction extends 
MessagingFunction<Long, Long, Long, Long> {
+       private static final class DegreesMessageFunction extends 
ScatterFunction<Long, Long, Long, Long> {
 
                @Override
                public void sendMessages(Vertex<Long, Long> vertex) {
@@ -655,7 +654,7 @@ public class ScatterGatherConfigurationITCase extends 
MultipleProgramsTestBase {
        }
 
        @SuppressWarnings("serial")
-       public static final class VertexUpdateDirection extends 
VertexUpdateFunction<Long, HashSet<Long>, Long> {
+       private static final class VertexUpdateDirection extends 
GatherFunction<Long, HashSet<Long>, Long> {
 
                @Override
                public void updateVertex(Vertex<Long, HashSet<Long>> vertex, 
MessageIterator<Long> messages) throws Exception {
@@ -670,7 +669,7 @@ public class ScatterGatherConfigurationITCase extends 
MultipleProgramsTestBase {
        }
 
        @SuppressWarnings("serial")
-       public static final class UpdateFunctionInDegrees extends 
VertexUpdateFunction<Long, Long, Long> {
+       private static final class UpdateFunctionInDegrees extends 
GatherFunction<Long, Long, Long> {
 
                @Override
                public void updateVertex(Vertex<Long, Long> vertex, 
MessageIterator<Long> inMessages) {
@@ -680,7 +679,7 @@ public class ScatterGatherConfigurationITCase extends 
MultipleProgramsTestBase {
        }
 
        @SuppressWarnings("serial")
-       public static final class UpdateFunctionOutDegrees extends 
VertexUpdateFunction<Long, Long, Long> {
+       private static final class UpdateFunctionOutDegrees extends 
GatherFunction<Long, Long, Long> {
 
                @Override
                public void updateVertex(Vertex<Long, Long> vertex, 
MessageIterator<Long> inMessages) {
@@ -690,7 +689,7 @@ public class ScatterGatherConfigurationITCase extends 
MultipleProgramsTestBase {
        }
 
        @SuppressWarnings("serial")
-       public static final class VertexUpdateNumNeighbors extends 
VertexUpdateFunction<Long, Boolean,
+       private static final class VertexUpdateNumNeighbors extends 
GatherFunction<Long, Boolean,
                        Long> {
 
                @Override
@@ -706,7 +705,7 @@ public class ScatterGatherConfigurationITCase extends 
MultipleProgramsTestBase {
        }
 
        @SuppressWarnings("serial")
-       public static final class UpdateFunctionDegrees extends 
VertexUpdateFunction<Long, Long, Long> {
+       private static final class UpdateFunctionDegrees extends 
GatherFunction<Long, Long, Long> {
 
                @Override
                public void updateVertex(Vertex<Long, Long> vertex, 
MessageIterator<Long> inMessages) {
@@ -717,7 +716,7 @@ public class ScatterGatherConfigurationITCase extends 
MultipleProgramsTestBase {
        }
 
        @SuppressWarnings("serial")
-       public static final class IdMessengerSrc extends 
MessagingFunction<Long, HashSet<Long>, Long, Long> {
+       private static final class IdMessengerSrc extends ScatterFunction<Long, 
HashSet<Long>, Long, Long> {
 
                @Override
                public void sendMessages(Vertex<Long, HashSet<Long>> vertex) 
throws Exception {
@@ -728,7 +727,7 @@ public class ScatterGatherConfigurationITCase extends 
MultipleProgramsTestBase {
        }
 
        @SuppressWarnings("serial")
-       public static final class IdMessengerAll extends 
MessagingFunction<Long, HashSet<Long>, Long, Long> {
+       private static final class IdMessengerAll extends ScatterFunction<Long, 
HashSet<Long>, Long, Long> {
 
                @Override
                public void sendMessages(Vertex<Long, HashSet<Long>> vertex) 
throws Exception {
@@ -743,7 +742,7 @@ public class ScatterGatherConfigurationITCase extends 
MultipleProgramsTestBase {
        }
 
        @SuppressWarnings("serial")
-       public static final class SendMsgToAll extends MessagingFunction<Long, 
HashSet<Long>, Long, Long> {
+       private static final class SendMsgToAll extends ScatterFunction<Long, 
HashSet<Long>, Long, Long> {
 
                @Override
                public void sendMessages(Vertex<Long, HashSet<Long>> vertex) 
throws Exception {
@@ -752,7 +751,7 @@ public class ScatterGatherConfigurationITCase extends 
MultipleProgramsTestBase {
        }
 
        @SuppressWarnings("serial")
-       public static final class IdMessenger extends MessagingFunction<Long, 
Boolean, Long, Long> {
+       private static final class IdMessenger extends ScatterFunction<Long, 
Boolean, Long, Long> {
 
                @Override
                public void sendMessages(Vertex<Long, Boolean> vertex) throws 
Exception {
@@ -767,7 +766,7 @@ public class ScatterGatherConfigurationITCase extends 
MultipleProgramsTestBase {
        }
 
        @SuppressWarnings("serial")
-       public static final class IdMessengerTrg extends 
MessagingFunction<Long, HashSet<Long>, Long, Long> {
+       private static final class IdMessengerTrg extends ScatterFunction<Long, 
HashSet<Long>, Long, Long> {
 
                @Override
                public void sendMessages(Vertex<Long, HashSet<Long>> vertex) 
throws Exception {
@@ -778,7 +777,7 @@ public class ScatterGatherConfigurationITCase extends 
MultipleProgramsTestBase {
        }
 
        @SuppressWarnings("serial")
-       public static final class AssignOneMapper implements 
MapFunction<Vertex<Long, Long>, Long> {
+       private static final class AssignOneMapper implements 
MapFunction<Vertex<Long, Long>, Long> {
 
                public Long map(Vertex<Long, Long> value) {
                        return 1L;
@@ -786,7 +785,7 @@ public class ScatterGatherConfigurationITCase extends 
MultipleProgramsTestBase {
        }
 
        @SuppressWarnings("serial")
-       public static final class InitialiseHashSetMapper implements 
MapFunction<Vertex<Long, Long>, HashSet<Long>> {
+       private static final class InitialiseHashSetMapper implements 
MapFunction<Vertex<Long, Long>, HashSet<Long>> {
 
                @Override
                public HashSet<Long> map(Vertex<Long, Long> value) throws 
Exception {

Reply via email to