Repository: flink Updated Branches: refs/heads/master 6c6b17b4d -> 7324b9c17
http://git-wip-us.apache.org/repos/asf/flink/blob/918e5d0c/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/spargel/SpargelTranslationTest.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/spargel/SpargelTranslationTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/spargel/SpargelTranslationTest.java index 7755708..2c7e093 100644 --- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/spargel/SpargelTranslationTest.java +++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/spargel/SpargelTranslationTest.java @@ -16,28 +16,27 @@ * limitations under the License. */ - package org.apache.flink.graph.spargel; -import static org.junit.Assert.assertArrayEquals; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; - -import org.junit.Test; import org.apache.flink.api.common.aggregators.LongSumAggregator; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.io.DiscardingOutputFormat; import org.apache.flink.api.java.operators.DeltaIteration; import org.apache.flink.api.java.operators.DeltaIterationResultSet; -import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.operators.TwoInputUdfOperator; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.graph.Graph; import org.apache.flink.graph.Vertex; import org.apache.flink.types.NullValue; +import org.junit.Test; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; @SuppressWarnings("serial") public class SpargelTranslationTest { @@ -46,28 +45,28 @@ public class SpargelTranslationTest { public void testTranslationPlainEdges() { try { final String ITERATION_NAME = "Test Name"; - + final String AGGREGATOR_NAME = "AggregatorName"; - + final String BC_SET_MESSAGES_NAME = "borat messages"; - + final String BC_SET_UPDATES_NAME = "borat updates"; final int NUM_ITERATIONS = 13; - + final int ITERATION_parallelism = 77; - - + + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - + DataSet<Long> bcMessaging = env.fromElements(1L); DataSet<Long> bcUpdate = env.fromElements(1L); - + DataSet<Vertex<String, Double>> result; - + // ------------ construct the test program ------------------ { - + DataSet<Tuple2<String, Double>> initialVertices = env.fromElements(new Tuple2<>("abc", 3.44)); DataSet<Tuple2<String, String>> edges = env.fromElements(new Tuple2<>("a", "c")); @@ -83,41 +82,41 @@ public class SpargelTranslationTest { ScatterGatherConfiguration parameters = new ScatterGatherConfiguration(); - parameters.addBroadcastSetForMessagingFunction(BC_SET_MESSAGES_NAME, bcMessaging); - parameters.addBroadcastSetForUpdateFunction(BC_SET_UPDATES_NAME, bcUpdate); + parameters.addBroadcastSetForScatterFunction(BC_SET_MESSAGES_NAME, bcMessaging); + parameters.addBroadcastSetForGatherFunction(BC_SET_UPDATES_NAME, bcUpdate); parameters.setName(ITERATION_NAME); parameters.setParallelism(ITERATION_parallelism); parameters.registerAggregator(AGGREGATOR_NAME, new LongSumAggregator()); - result = graph.runScatterGatherIteration(new UpdateFunction(), new MessageFunctionNoEdgeValue(), + result = graph.runScatterGatherIteration(new MessageFunctionNoEdgeValue(), new UpdateFunction(), NUM_ITERATIONS, parameters).getVertices(); result.output(new DiscardingOutputFormat<Vertex<String, Double>>()); } - - + + // ------------- validate the java program ---------------- - + assertTrue(result instanceof DeltaIterationResultSet); - + DeltaIterationResultSet<?, ?> resultSet = (DeltaIterationResultSet<?, ?>) result; DeltaIteration<?, ?> iteration = resultSet.getIterationHead(); - + // check the basic iteration properties assertEquals(NUM_ITERATIONS, resultSet.getMaxIterations()); assertArrayEquals(new int[] {0}, resultSet.getKeyPositions()); assertEquals(ITERATION_parallelism, iteration.getParallelism()); assertEquals(ITERATION_NAME, iteration.getName()); - + assertEquals(AGGREGATOR_NAME, iteration.getAggregators().getAllRegisteredAggregators().iterator().next().getName()); - + // validate that the semantic properties are set as they should TwoInputUdfOperator<?, ?, ?, ?> solutionSetJoin = (TwoInputUdfOperator<?, ?, ?, ?>) resultSet.getNextWorkset(); assertTrue(solutionSetJoin.getSemanticProperties().getForwardingTargetFields(0, 0).contains(0)); assertTrue(solutionSetJoin.getSemanticProperties().getForwardingTargetFields(1, 0).contains(0)); - + TwoInputUdfOperator<?, ?, ?, ?> edgesJoin = (TwoInputUdfOperator<?, ?, ?, ?>) solutionSetJoin.getInput1(); - + // validate that the broadcast sets are forwarded assertEquals(bcUpdate, solutionSetJoin.getBroadcastSets().get(BC_SET_UPDATES_NAME)); assertEquals(bcMessaging, edgesJoin.getBroadcastSets().get(BC_SET_MESSAGES_NAME)); @@ -128,29 +127,29 @@ public class SpargelTranslationTest { fail(e.getMessage()); } } - + @Test public void testTranslationPlainEdgesWithForkedBroadcastVariable() { try { final String ITERATION_NAME = "Test Name"; - + final String AGGREGATOR_NAME = "AggregatorName"; - + final String BC_SET_MESSAGES_NAME = "borat messages"; - + final String BC_SET_UPDATES_NAME = "borat updates"; final int NUM_ITERATIONS = 13; - + final int ITERATION_parallelism = 77; - - + + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - + DataSet<Long> bcVar = env.fromElements(1L); - + DataSet<Vertex<String, Double>> result; - + // ------------ construct the test program ------------------ { @@ -169,41 +168,41 @@ public class SpargelTranslationTest { ScatterGatherConfiguration parameters = new ScatterGatherConfiguration(); - parameters.addBroadcastSetForMessagingFunction(BC_SET_MESSAGES_NAME, bcVar); - parameters.addBroadcastSetForUpdateFunction(BC_SET_UPDATES_NAME, bcVar); + parameters.addBroadcastSetForScatterFunction(BC_SET_MESSAGES_NAME, bcVar); + parameters.addBroadcastSetForGatherFunction(BC_SET_UPDATES_NAME, bcVar); parameters.setName(ITERATION_NAME); parameters.setParallelism(ITERATION_parallelism); parameters.registerAggregator(AGGREGATOR_NAME, new LongSumAggregator()); - - result = graph.runScatterGatherIteration(new UpdateFunction(), new MessageFunctionNoEdgeValue(), + + result = graph.runScatterGatherIteration(new MessageFunctionNoEdgeValue(), new UpdateFunction(), NUM_ITERATIONS, parameters).getVertices(); result.output(new DiscardingOutputFormat<Vertex<String, Double>>()); } - - + + // ------------- validate the java program ---------------- - + assertTrue(result instanceof DeltaIterationResultSet); - + DeltaIterationResultSet<?, ?> resultSet = (DeltaIterationResultSet<?, ?>) result; DeltaIteration<?, ?> iteration = resultSet.getIterationHead(); - + // check the basic iteration properties assertEquals(NUM_ITERATIONS, resultSet.getMaxIterations()); assertArrayEquals(new int[] {0}, resultSet.getKeyPositions()); assertEquals(ITERATION_parallelism, iteration.getParallelism()); assertEquals(ITERATION_NAME, iteration.getName()); - + assertEquals(AGGREGATOR_NAME, iteration.getAggregators().getAllRegisteredAggregators().iterator().next().getName()); - + // validate that the semantic properties are set as they should TwoInputUdfOperator<?, ?, ?, ?> solutionSetJoin = (TwoInputUdfOperator<?, ?, ?, ?>) resultSet.getNextWorkset(); assertTrue(solutionSetJoin.getSemanticProperties().getForwardingTargetFields(0, 0).contains(0)); assertTrue(solutionSetJoin.getSemanticProperties().getForwardingTargetFields(1, 0).contains(0)); - + TwoInputUdfOperator<?, ?, ?, ?> edgesJoin = (TwoInputUdfOperator<?, ?, ?, ?>) solutionSetJoin.getInput1(); - + // validate that the broadcast sets are forwarded assertEquals(bcVar, solutionSetJoin.getBroadcastSets().get(BC_SET_UPDATES_NAME)); assertEquals(bcVar, edgesJoin.getBroadcastSets().get(BC_SET_MESSAGES_NAME)); @@ -214,18 +213,18 @@ public class SpargelTranslationTest { fail(e.getMessage()); } } - + // -------------------------------------------------------------------------------------------- - - public static class UpdateFunction extends VertexUpdateFunction<String, Double, Long> { + + private static class MessageFunctionNoEdgeValue extends ScatterFunction<String, Double, Long, NullValue> { @Override - public void updateVertex(Vertex<String, Double> vertex, MessageIterator<Long> inMessages) {} + public void sendMessages(Vertex<String, Double> vertex) {} } - - public static class MessageFunctionNoEdgeValue extends MessagingFunction<String, Double, Long, NullValue> { + + private static class UpdateFunction extends GatherFunction<String, Double, Long> { @Override - public void sendMessages(Vertex<String, Double> vertex) {} + public void updateVertex(Vertex<String, Double> vertex, MessageIterator<Long> inMessages) {} } } http://git-wip-us.apache.org/repos/asf/flink/blob/918e5d0c/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/CollectionModeSuperstepITCase.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/CollectionModeSuperstepITCase.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/CollectionModeSuperstepITCase.java index b1c2a2c..cb7573c 100644 --- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/CollectionModeSuperstepITCase.java +++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/CollectionModeSuperstepITCase.java @@ -24,9 +24,9 @@ import org.apache.flink.api.java.io.DiscardingOutputFormat; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.graph.Graph; import org.apache.flink.graph.Vertex; +import org.apache.flink.graph.spargel.GatherFunction; import org.apache.flink.graph.spargel.MessageIterator; -import org.apache.flink.graph.spargel.MessagingFunction; -import org.apache.flink.graph.spargel.VertexUpdateFunction; +import org.apache.flink.graph.spargel.ScatterFunction; import org.apache.flink.graph.utils.VertexToTuple2Map; import org.junit.Assert; import org.junit.Test; @@ -36,47 +36,46 @@ public class CollectionModeSuperstepITCase { /** * Dummy iteration to test that the supersteps are correctly incremented - * and can be retrieved from inside the updated and messaging functions. + * and can be retrieved from inside the scatter and gather functions. * All vertices start with value 1 and increase their value by 1 * in each iteration. */ @Test public void testProgram() throws Exception { ExecutionEnvironment env = ExecutionEnvironment.createCollectionsEnvironment(); - - Graph<Long, Long, Long> graph = Graph.fromCollection(TestGraphUtils.getLongLongVertices(), + + Graph<Long, Long, Long> graph = Graph.fromCollection(TestGraphUtils.getLongLongVertices(), TestGraphUtils.getLongLongEdges(), env).mapVertices(new AssignOneMapper()); - + Graph<Long, Long, Long> result = graph.runScatterGatherIteration( - new UpdateFunction(), new MessageFunction(), 10); + new MessageFunction(), new UpdateFunction(), 10); result.getVertices().map( new VertexToTuple2Map<Long, Long>()).output( new DiscardingOutputFormat<Tuple2<Long, Long>>()); env.execute(); } - - public static final class UpdateFunction extends VertexUpdateFunction<Long, Long, Long> { + + private static final class MessageFunction extends ScatterFunction<Long, Long, Long, Long> { @Override - public void updateVertex(Vertex<Long, Long> vertex, MessageIterator<Long> inMessages) { + public void sendMessages(Vertex<Long, Long> vertex) { long superstep = getSuperstepNumber(); Assert.assertEquals(true, vertex.getValue() == superstep); - setNewVertexValue(vertex.getValue() + 1); + //send message to keep vertices active + sendMessageToAllNeighbors(vertex.getValue()); } } - - public static final class MessageFunction extends MessagingFunction<Long, Long, Long, Long> { + + private static final class UpdateFunction extends GatherFunction<Long, Long, Long> { @Override - public void sendMessages(Vertex<Long, Long> vertex) { + public void updateVertex(Vertex<Long, Long> vertex, MessageIterator<Long> inMessages) { long superstep = getSuperstepNumber(); Assert.assertEquals(true, vertex.getValue() == superstep); - //send message to keep vertices active - sendMessageToAllNeighbors(vertex.getValue()); + setNewVertexValue(vertex.getValue() + 1); } } - public static final class AssignOneMapper implements MapFunction<Vertex<Long, Long>, Long> { - + private static final class AssignOneMapper implements MapFunction<Vertex<Long, Long>, Long> { public Long map(Vertex<Long, Long> value) { return 1L; } http://git-wip-us.apache.org/repos/asf/flink/blob/918e5d0c/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/ScatterGatherConfigurationITCase.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/ScatterGatherConfigurationITCase.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/ScatterGatherConfigurationITCase.java index f14e002..fcd0d82 100644 --- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/ScatterGatherConfigurationITCase.java +++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/ScatterGatherConfigurationITCase.java @@ -18,9 +18,6 @@ package org.apache.flink.graph.test; -import java.util.HashSet; -import java.util.List; - import org.apache.flink.api.common.aggregators.LongSumAggregator; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.DataSet; @@ -31,18 +28,21 @@ import org.apache.flink.graph.Edge; import org.apache.flink.graph.EdgeDirection; import org.apache.flink.graph.Graph; import org.apache.flink.graph.Vertex; +import org.apache.flink.graph.spargel.GatherFunction; import org.apache.flink.graph.spargel.MessageIterator; -import org.apache.flink.graph.spargel.MessagingFunction; +import org.apache.flink.graph.spargel.ScatterFunction; import org.apache.flink.graph.spargel.ScatterGatherConfiguration; import org.apache.flink.graph.spargel.ScatterGatherIteration; -import org.apache.flink.graph.spargel.VertexUpdateFunction; +import org.apache.flink.graph.utils.VertexToTuple2Map; import org.apache.flink.test.util.MultipleProgramsTestBase; import org.apache.flink.types.LongValue; import org.junit.Assert; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; -import org.apache.flink.graph.utils.VertexToTuple2Map; + +import java.util.HashSet; +import java.util.List; @RunWith(Parameterized.class) public class ScatterGatherConfigurationITCase extends MultipleProgramsTestBase { @@ -59,30 +59,30 @@ public class ScatterGatherConfigurationITCase extends MultipleProgramsTestBase { * Test Graph's runScatterGatherIteration when configuration parameters are provided */ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - Graph<Long, Long, Long> graph = Graph.fromCollection(TestGraphUtils.getLongLongVertices(), + + Graph<Long, Long, Long> graph = Graph.fromCollection(TestGraphUtils.getLongLongVertices(), TestGraphUtils.getLongLongEdges(), env).mapVertices(new AssignOneMapper()); // create the configuration object ScatterGatherConfiguration parameters = new ScatterGatherConfiguration(); - parameters.addBroadcastSetForUpdateFunction("updateBcastSet", env.fromElements(1, 2, 3)); - parameters.addBroadcastSetForMessagingFunction("messagingBcastSet", env.fromElements(4, 5, 6)); + parameters.addBroadcastSetForScatterFunction("messagingBcastSet", env.fromElements(4, 5, 6)); + parameters.addBroadcastSetForGatherFunction("updateBcastSet", env.fromElements(1, 2, 3)); parameters.registerAggregator("superstepAggregator", new LongSumAggregator()); parameters.setOptNumVertices(true); Graph<Long, Long, Long> res = graph.runScatterGatherIteration( - new UpdateFunction(), new MessageFunction(), 10, parameters); + new MessageFunction(), new UpdateFunction(), 10, parameters); DataSet<Vertex<Long,Long>> data = res.getVertices(); List<Vertex<Long,Long>> result= data.collect(); - + expectedResult = "1,11\n" + "2,11\n" + "3,11\n" + "4,11\n" + "5,11"; - + compareResultAsTuples(result, expectedResult); } @@ -95,29 +95,29 @@ public class ScatterGatherConfigurationITCase extends MultipleProgramsTestBase { final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); ScatterGatherIteration<Long, Long, Long, Long> iteration = ScatterGatherIteration - .withEdges(TestGraphUtils.getLongLongEdgeData(env), new DummyUpdateFunction(), - new DummyMessageFunction(), 10); - + .withEdges(TestGraphUtils.getLongLongEdgeData(env), new DummyMessageFunction(), + new DummyUpdateFunction(), 10); + ScatterGatherConfiguration parameters = new ScatterGatherConfiguration(); parameters.setName("gelly iteration"); parameters.setParallelism(2); parameters.setSolutionSetUnmanagedMemory(true); - + iteration.configure(parameters); - + Assert.assertEquals("gelly iteration", iteration.getIterationConfiguration().getName("")); Assert.assertEquals(2, iteration.getIterationConfiguration().getParallelism()); Assert.assertEquals(true, iteration.getIterationConfiguration().isSolutionSetUnmanagedMemory()); DataSet<Vertex<Long, Long>> data = TestGraphUtils.getLongLongVertexData(env).runOperation(iteration); List<Vertex<Long,Long>> result= data.collect(); - + expectedResult = "1,11\n" + "2,12\n" + "3,13\n" + "4,14\n" + "5,15"; - + compareResultAsTuples(result, expectedResult); } @@ -128,23 +128,23 @@ public class ScatterGatherConfigurationITCase extends MultipleProgramsTestBase { * i.e. degrees and numVertices will be -1, EdgeDirection will be OUT. */ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - Graph<Long, Long, Long> graph = Graph.fromCollection(TestGraphUtils.getLongLongVertices(), + + Graph<Long, Long, Long> graph = Graph.fromCollection(TestGraphUtils.getLongLongVertices(), TestGraphUtils.getLongLongEdges(), env).mapVertices(new AssignOneMapper()); Graph<Long, Long, Long> res = graph.runScatterGatherIteration( - new UpdateFunctionDefault(), new MessageFunctionDefault(), 5); + new MessageFunctionDefault(), new UpdateFunctionDefault(), 5); + - DataSet<Tuple2<Long, Long>> data = res.getVertices().map(new VertexToTuple2Map<Long, Long>()); List<Tuple2<Long, Long>> result= data.collect(); - + expectedResult = "1,6\n" + "2,6\n" + "3,6\n" + "4,6\n" + "5,6"; - + compareResultAsTuples(result, expectedResult); } @@ -162,7 +162,7 @@ public class ScatterGatherConfigurationITCase extends MultipleProgramsTestBase { .mapVertices(new InitialiseHashSetMapper()); DataSet<Vertex<Long, HashSet<Long>>> resultedVertices = graph - .runScatterGatherIteration(new VertexUpdateDirection(), new IdMessengerTrg(), 5) + .runScatterGatherIteration(new IdMessengerTrg(), new VertexUpdateDirection(), 5) .getVertices(); List<Vertex<Long, HashSet<Long>>> result= resultedVertices.collect(); @@ -172,7 +172,7 @@ public class ScatterGatherConfigurationITCase extends MultipleProgramsTestBase { "3,[1, 2]\n" + "4,[3]\n" + "5,[3, 4]"; - + compareResultAsTuples(result, expectedResult); } @@ -195,7 +195,7 @@ public class ScatterGatherConfigurationITCase extends MultipleProgramsTestBase { parameters.setDirection(EdgeDirection.IN); DataSet<Vertex<Long, HashSet<Long>>> resultedVertices = graph - .runScatterGatherIteration(new VertexUpdateDirection(), new IdMessengerSrc(), 5, parameters) + .runScatterGatherIteration(new IdMessengerSrc(), new VertexUpdateDirection(), 5, parameters) .getVertices(); List<Vertex<Long, HashSet<Long>>> result= resultedVertices.collect(); @@ -205,7 +205,7 @@ public class ScatterGatherConfigurationITCase extends MultipleProgramsTestBase { "3,[4, 5]\n" + "4,[5]\n" + "5,[1]"; - + compareResultAsTuples(result, expectedResult); } @@ -228,7 +228,7 @@ public class ScatterGatherConfigurationITCase extends MultipleProgramsTestBase { parameters.setDirection(EdgeDirection.ALL); DataSet<Vertex<Long, HashSet<Long>>> resultedVertices = graph - .runScatterGatherIteration(new VertexUpdateDirection(), new IdMessengerAll(), 5, parameters) + .runScatterGatherIteration(new IdMessengerAll(), new VertexUpdateDirection(), 5, parameters) .getVertices(); List<Vertex<Long, HashSet<Long>>> result= resultedVertices.collect(); @@ -238,7 +238,7 @@ public class ScatterGatherConfigurationITCase extends MultipleProgramsTestBase { "3,[1, 2, 4, 5]\n" + "4,[3, 5]\n" + "5,[1, 3, 4]"; - + compareResultAsTuples(result, expectedResult); } @@ -261,7 +261,7 @@ public class ScatterGatherConfigurationITCase extends MultipleProgramsTestBase { parameters.setDirection(EdgeDirection.IN); DataSet<Vertex<Long, HashSet<Long>>> resultedVertices = graph - .runScatterGatherIteration(new VertexUpdateDirection(), new SendMsgToAll(), 5, parameters) + .runScatterGatherIteration(new SendMsgToAll(), new VertexUpdateDirection(), 5, parameters) .getVertices(); List<Vertex<Long, HashSet<Long>>> result = resultedVertices.collect(); @@ -271,7 +271,7 @@ public class ScatterGatherConfigurationITCase extends MultipleProgramsTestBase { "3,[4, 5]\n" + "4,[5]\n" + "5,[1]"; - + compareResultAsTuples(result, expectedResult); } @@ -294,7 +294,7 @@ public class ScatterGatherConfigurationITCase extends MultipleProgramsTestBase { parameters.setDirection(EdgeDirection.OUT); DataSet<Vertex<Long, HashSet<Long>>> resultedVertices = graph - .runScatterGatherIteration(new VertexUpdateDirection(), new SendMsgToAll(), 5, parameters) + .runScatterGatherIteration(new SendMsgToAll(), new VertexUpdateDirection(), 5, parameters) .getVertices(); List<Vertex<Long, HashSet<Long>>> result = resultedVertices.collect(); @@ -304,7 +304,7 @@ public class ScatterGatherConfigurationITCase extends MultipleProgramsTestBase { "3,[1, 2]\n" + "4,[3]\n" + "5,[3, 4]"; - + compareResultAsTuples(result, expectedResult); } @@ -327,7 +327,7 @@ public class ScatterGatherConfigurationITCase extends MultipleProgramsTestBase { parameters.setDirection(EdgeDirection.ALL); DataSet<Vertex<Long, HashSet<Long>>> resultedVertices = graph - .runScatterGatherIteration(new VertexUpdateDirection(), new SendMsgToAll(), 5, parameters) + .runScatterGatherIteration(new SendMsgToAll(), new VertexUpdateDirection(), 5, parameters) .getVertices(); List<Vertex<Long, HashSet<Long>>> result = resultedVertices.collect(); @@ -337,7 +337,7 @@ public class ScatterGatherConfigurationITCase extends MultipleProgramsTestBase { "3,[1, 2, 4, 5]\n" + "4,[3, 5]\n" + "5,[1, 3, 4]"; - + compareResultAsTuples(result, expectedResult); } @@ -353,8 +353,8 @@ public class ScatterGatherConfigurationITCase extends MultipleProgramsTestBase { Graph<Long, Long, Long> graph = Graph.fromCollection(TestGraphUtils.getLongLongVertices(), TestGraphUtils.getLongLongEdges(), env); - DataSet<Vertex<Long, Long>> verticesWithNumVertices = graph.runScatterGatherIteration(new UpdateFunctionNumVertices(), - new DummyMessageFunction(), 2).getVertices(); + DataSet<Vertex<Long, Long>> verticesWithNumVertices = graph.runScatterGatherIteration(new DummyMessageFunction(), + new UpdateFunctionNumVertices(), 2).getVertices(); List<Vertex<Long, Long>> result= verticesWithNumVertices.collect(); @@ -363,7 +363,7 @@ public class ScatterGatherConfigurationITCase extends MultipleProgramsTestBase { "3,-1\n" + "4,-1\n" + "5,-1"; - + compareResultAsTuples(result, expectedResult); } @@ -371,9 +371,9 @@ public class ScatterGatherConfigurationITCase extends MultipleProgramsTestBase { public void testInDegreesSet() throws Exception { /* - * Test that if the degrees are set, they can be accessed in every superstep + * Test that if the degrees are set, they can be accessed in every superstep * inside the update function and the value - * is correctly computed for degrees in the messaging function. + * is correctly computed for degrees in the scatter function. */ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); @@ -386,7 +386,7 @@ public class ScatterGatherConfigurationITCase extends MultipleProgramsTestBase { parameters.setOptDegrees(true); DataSet<Vertex<Long, Long>> verticesWithDegrees = graph.runScatterGatherIteration( - new UpdateFunctionInDegrees(), new DegreesMessageFunction(), 5, parameters).getVertices(); + new DegreesMessageFunction(), new UpdateFunctionInDegrees(), 5, parameters).getVertices(); List<Vertex<Long, Long>> result= verticesWithDegrees.collect(); @@ -395,7 +395,7 @@ public class ScatterGatherConfigurationITCase extends MultipleProgramsTestBase { "3,2\n" + "4,1\n" + "5,2"; - + compareResultAsTuples(result, expectedResult); } @@ -411,7 +411,7 @@ public class ScatterGatherConfigurationITCase extends MultipleProgramsTestBase { TestGraphUtils.getLongLongEdges(), env); DataSet<Vertex<Long, Long>> verticesWithDegrees = graph.runScatterGatherIteration( - new UpdateFunctionInDegrees(), new DummyMessageFunction(), 2).getVertices(); + new DummyMessageFunction(), new UpdateFunctionInDegrees(), 2).getVertices(); List<Vertex<Long, Long>> result= verticesWithDegrees.collect(); @@ -420,7 +420,7 @@ public class ScatterGatherConfigurationITCase extends MultipleProgramsTestBase { "3,-1\n" + "4,-1\n" + "5,-1"; - + compareResultAsTuples(result, expectedResult); } @@ -430,7 +430,7 @@ public class ScatterGatherConfigurationITCase extends MultipleProgramsTestBase { /* * Test that if the degrees are set, they can be accessed in every superstep * inside the update function and the value - * is correctly computed for degrees in the messaging function. + * is correctly computed for degrees in the scatter function. */ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); @@ -443,7 +443,7 @@ public class ScatterGatherConfigurationITCase extends MultipleProgramsTestBase { parameters.setOptDegrees(true); DataSet<Vertex<Long, Long>> verticesWithDegrees = graph.runScatterGatherIteration( - new UpdateFunctionOutDegrees(), new DegreesMessageFunction(), 5, parameters).getVertices(); + new DegreesMessageFunction(), new UpdateFunctionOutDegrees(), 5, parameters).getVertices(); List<Vertex<Long, Long>> result= verticesWithDegrees.collect(); @@ -452,7 +452,7 @@ public class ScatterGatherConfigurationITCase extends MultipleProgramsTestBase { "3,2\n" + "4,1\n" + "5,1"; - + compareResultAsTuples(result, expectedResult); } @@ -468,7 +468,7 @@ public class ScatterGatherConfigurationITCase extends MultipleProgramsTestBase { TestGraphUtils.getLongLongEdges(), env); DataSet<Vertex<Long, Long>> verticesWithDegrees = graph.runScatterGatherIteration( - new UpdateFunctionInDegrees(), new DummyMessageFunction(), 2).getVertices(); + new DummyMessageFunction(), new UpdateFunctionInDegrees(), 2).getVertices(); List<Vertex<Long, Long>> result= verticesWithDegrees.collect(); @@ -477,7 +477,7 @@ public class ScatterGatherConfigurationITCase extends MultipleProgramsTestBase { "3,-1\n" + "4,-1\n" + "5,-1"; - + compareResultAsTuples(result, expectedResult); } @@ -500,7 +500,7 @@ public class ScatterGatherConfigurationITCase extends MultipleProgramsTestBase { parameters.setDirection(EdgeDirection.ALL); DataSet<Vertex<Long, Boolean>> verticesWithNumNeighbors = graph.runScatterGatherIteration( - new VertexUpdateNumNeighbors(), new IdMessenger(), 1, parameters).getVertices(); + new IdMessenger(), new VertexUpdateNumNeighbors(), 1, parameters).getVertices(); List<Vertex<Long, Boolean>> result= verticesWithNumNeighbors.collect(); @@ -509,107 +509,107 @@ public class ScatterGatherConfigurationITCase extends MultipleProgramsTestBase { "3,true\n" + "4,true\n" + "5,true"; - + compareResultAsTuples(result, expectedResult); } @SuppressWarnings("serial") - public static final class UpdateFunction extends VertexUpdateFunction<Long, Long, Long> { - - LongSumAggregator aggregator = new LongSumAggregator(); + private static final class MessageFunction extends ScatterFunction<Long, Long, Long, Long> { @Override public void preSuperstep() { - + // test bcast variable @SuppressWarnings("unchecked") - List<Tuple1<Integer>> bcastSet = (List<Tuple1<Integer>>)(List<?>)getBroadcastSet("updateBcastSet"); - Assert.assertEquals(1, bcastSet.get(0)); - Assert.assertEquals(2, bcastSet.get(1)); - Assert.assertEquals(3, bcastSet.get(2)); - - // test aggregator - aggregator = getIterationAggregator("superstepAggregator"); + List<Tuple1<Integer>> bcastSet = (List<Tuple1<Integer>>)(List<?>)getBroadcastSet("messagingBcastSet"); + Assert.assertEquals(4, bcastSet.get(0)); + Assert.assertEquals(5, bcastSet.get(1)); + Assert.assertEquals(6, bcastSet.get(2)); // test number of vertices Assert.assertEquals(5, getNumberOfVertices()); - + + // test aggregator + if (getSuperstepNumber() == 2) { + long aggrValue = ((LongValue)getPreviousIterationAggregate("superstepAggregator")).getValue(); + Assert.assertEquals(5, aggrValue); + } } @Override - public void updateVertex(Vertex<Long, Long> vertex, MessageIterator<Long> inMessages) { - long superstep = getSuperstepNumber(); - aggregator.aggregate(superstep); - - setNewVertexValue(vertex.getValue() + 1); + public void sendMessages(Vertex<Long, Long> vertex) { + //send message to keep vertices active + sendMessageToAllNeighbors(vertex.getValue()); } } @SuppressWarnings("serial") - public static final class UpdateFunctionDefault extends VertexUpdateFunction<Long, Long, Long> { + private static final class MessageFunctionDefault extends ScatterFunction<Long, Long, Long, Long> { @Override - public void updateVertex(Vertex<Long, Long> vertex, MessageIterator<Long> inMessages) { - + public void sendMessages(Vertex<Long, Long> vertex) { // test number of vertices Assert.assertEquals(-1, getNumberOfVertices()); // test degrees Assert.assertEquals(-1, getInDegree()); Assert.assertEquals(-1, getOutDegree()); - - setNewVertexValue(vertex.getValue() + 1); + //send message to keep vertices active + sendMessageToAllNeighbors(vertex.getValue()); } } - + @SuppressWarnings("serial") - public static final class MessageFunction extends MessagingFunction<Long, Long, Long, Long> { + private static final class UpdateFunction extends GatherFunction<Long, Long, Long> { + + LongSumAggregator aggregator = new LongSumAggregator(); @Override public void preSuperstep() { - + // test bcast variable @SuppressWarnings("unchecked") - List<Tuple1<Integer>> bcastSet = (List<Tuple1<Integer>>)(List<?>)getBroadcastSet("messagingBcastSet"); - Assert.assertEquals(4, bcastSet.get(0)); - Assert.assertEquals(5, bcastSet.get(1)); - Assert.assertEquals(6, bcastSet.get(2)); + List<Tuple1<Integer>> bcastSet = (List<Tuple1<Integer>>)(List<?>)getBroadcastSet("updateBcastSet"); + Assert.assertEquals(1, bcastSet.get(0)); + Assert.assertEquals(2, bcastSet.get(1)); + Assert.assertEquals(3, bcastSet.get(2)); + + // test aggregator + aggregator = getIterationAggregator("superstepAggregator"); // test number of vertices Assert.assertEquals(5, getNumberOfVertices()); - - // test aggregator - if (getSuperstepNumber() == 2) { - long aggrValue = ((LongValue)getPreviousIterationAggregate("superstepAggregator")).getValue(); - Assert.assertEquals(5, aggrValue); - } + } @Override - public void sendMessages(Vertex<Long, Long> vertex) { - //send message to keep vertices active - sendMessageToAllNeighbors(vertex.getValue()); + public void updateVertex(Vertex<Long, Long> vertex, MessageIterator<Long> inMessages) { + long superstep = getSuperstepNumber(); + aggregator.aggregate(superstep); + + setNewVertexValue(vertex.getValue() + 1); } } @SuppressWarnings("serial") - public static final class MessageFunctionDefault extends MessagingFunction<Long, Long, Long, Long> { + private static final class UpdateFunctionDefault extends GatherFunction<Long, Long, Long> { @Override - public void sendMessages(Vertex<Long, Long> vertex) { + public void updateVertex(Vertex<Long, Long> vertex, MessageIterator<Long> inMessages) { + // test number of vertices Assert.assertEquals(-1, getNumberOfVertices()); // test degrees Assert.assertEquals(-1, getInDegree()); Assert.assertEquals(-1, getOutDegree()); - //send message to keep vertices active - sendMessageToAllNeighbors(vertex.getValue()); + + setNewVertexValue(vertex.getValue() + 1); } } @SuppressWarnings("serial") - public static final class UpdateFunctionNumVertices extends VertexUpdateFunction<Long, Long, Long> { + private static final class UpdateFunctionNumVertices extends GatherFunction<Long, Long, Long> { @Override public void updateVertex(Vertex<Long, Long> vertex, MessageIterator<Long> inMessages) { @@ -618,17 +618,16 @@ public class ScatterGatherConfigurationITCase extends MultipleProgramsTestBase { } @SuppressWarnings("serial") - public static final class DummyUpdateFunction extends VertexUpdateFunction<Long, Long, Long> { + private static final class DummyUpdateFunction extends GatherFunction<Long, Long, Long> { @Override public void updateVertex(Vertex<Long, Long> vertex, MessageIterator<Long> inMessages) { setNewVertexValue(vertex.getValue() + 1); } } - - @SuppressWarnings("serial") - public static final class DummyMessageFunction extends MessagingFunction<Long, Long, Long, Long> { + @SuppressWarnings("serial") + private static final class DummyMessageFunction extends ScatterFunction<Long, Long, Long, Long> { @Override public void sendMessages(Vertex<Long, Long> vertex) { //send message to keep vertices active @@ -637,7 +636,7 @@ public class ScatterGatherConfigurationITCase extends MultipleProgramsTestBase { } @SuppressWarnings("serial") - public static final class DegreesMessageFunction extends MessagingFunction<Long, Long, Long, Long> { + private static final class DegreesMessageFunction extends ScatterFunction<Long, Long, Long, Long> { @Override public void sendMessages(Vertex<Long, Long> vertex) { @@ -655,7 +654,7 @@ public class ScatterGatherConfigurationITCase extends MultipleProgramsTestBase { } @SuppressWarnings("serial") - public static final class VertexUpdateDirection extends VertexUpdateFunction<Long, HashSet<Long>, Long> { + private static final class VertexUpdateDirection extends GatherFunction<Long, HashSet<Long>, Long> { @Override public void updateVertex(Vertex<Long, HashSet<Long>> vertex, MessageIterator<Long> messages) throws Exception { @@ -670,7 +669,7 @@ public class ScatterGatherConfigurationITCase extends MultipleProgramsTestBase { } @SuppressWarnings("serial") - public static final class UpdateFunctionInDegrees extends VertexUpdateFunction<Long, Long, Long> { + private static final class UpdateFunctionInDegrees extends GatherFunction<Long, Long, Long> { @Override public void updateVertex(Vertex<Long, Long> vertex, MessageIterator<Long> inMessages) { @@ -680,7 +679,7 @@ public class ScatterGatherConfigurationITCase extends MultipleProgramsTestBase { } @SuppressWarnings("serial") - public static final class UpdateFunctionOutDegrees extends VertexUpdateFunction<Long, Long, Long> { + private static final class UpdateFunctionOutDegrees extends GatherFunction<Long, Long, Long> { @Override public void updateVertex(Vertex<Long, Long> vertex, MessageIterator<Long> inMessages) { @@ -690,7 +689,7 @@ public class ScatterGatherConfigurationITCase extends MultipleProgramsTestBase { } @SuppressWarnings("serial") - public static final class VertexUpdateNumNeighbors extends VertexUpdateFunction<Long, Boolean, + private static final class VertexUpdateNumNeighbors extends GatherFunction<Long, Boolean, Long> { @Override @@ -706,7 +705,7 @@ public class ScatterGatherConfigurationITCase extends MultipleProgramsTestBase { } @SuppressWarnings("serial") - public static final class UpdateFunctionDegrees extends VertexUpdateFunction<Long, Long, Long> { + private static final class UpdateFunctionDegrees extends GatherFunction<Long, Long, Long> { @Override public void updateVertex(Vertex<Long, Long> vertex, MessageIterator<Long> inMessages) { @@ -717,7 +716,7 @@ public class ScatterGatherConfigurationITCase extends MultipleProgramsTestBase { } @SuppressWarnings("serial") - public static final class IdMessengerSrc extends MessagingFunction<Long, HashSet<Long>, Long, Long> { + private static final class IdMessengerSrc extends ScatterFunction<Long, HashSet<Long>, Long, Long> { @Override public void sendMessages(Vertex<Long, HashSet<Long>> vertex) throws Exception { @@ -728,7 +727,7 @@ public class ScatterGatherConfigurationITCase extends MultipleProgramsTestBase { } @SuppressWarnings("serial") - public static final class IdMessengerAll extends MessagingFunction<Long, HashSet<Long>, Long, Long> { + private static final class IdMessengerAll extends ScatterFunction<Long, HashSet<Long>, Long, Long> { @Override public void sendMessages(Vertex<Long, HashSet<Long>> vertex) throws Exception { @@ -743,7 +742,7 @@ public class ScatterGatherConfigurationITCase extends MultipleProgramsTestBase { } @SuppressWarnings("serial") - public static final class SendMsgToAll extends MessagingFunction<Long, HashSet<Long>, Long, Long> { + private static final class SendMsgToAll extends ScatterFunction<Long, HashSet<Long>, Long, Long> { @Override public void sendMessages(Vertex<Long, HashSet<Long>> vertex) throws Exception { @@ -752,7 +751,7 @@ public class ScatterGatherConfigurationITCase extends MultipleProgramsTestBase { } @SuppressWarnings("serial") - public static final class IdMessenger extends MessagingFunction<Long, Boolean, Long, Long> { + private static final class IdMessenger extends ScatterFunction<Long, Boolean, Long, Long> { @Override public void sendMessages(Vertex<Long, Boolean> vertex) throws Exception { @@ -767,7 +766,7 @@ public class ScatterGatherConfigurationITCase extends MultipleProgramsTestBase { } @SuppressWarnings("serial") - public static final class IdMessengerTrg extends MessagingFunction<Long, HashSet<Long>, Long, Long> { + private static final class IdMessengerTrg extends ScatterFunction<Long, HashSet<Long>, Long, Long> { @Override public void sendMessages(Vertex<Long, HashSet<Long>> vertex) throws Exception { @@ -778,7 +777,7 @@ public class ScatterGatherConfigurationITCase extends MultipleProgramsTestBase { } @SuppressWarnings("serial") - public static final class AssignOneMapper implements MapFunction<Vertex<Long, Long>, Long> { + private static final class AssignOneMapper implements MapFunction<Vertex<Long, Long>, Long> { public Long map(Vertex<Long, Long> value) { return 1L; @@ -786,7 +785,7 @@ public class ScatterGatherConfigurationITCase extends MultipleProgramsTestBase { } @SuppressWarnings("serial") - public static final class InitialiseHashSetMapper implements MapFunction<Vertex<Long, Long>, HashSet<Long>> { + private static final class InitialiseHashSetMapper implements MapFunction<Vertex<Long, Long>, HashSet<Long>> { @Override public HashSet<Long> map(Vertex<Long, Long> value) throws Exception {