[FLINK-3618] [gelly] Rename abstract UDF classes in Scatter-Gather implementation
Rename MessageFunction to ScatterFunction and VertexUpdateFunction to GatherFunction. Change the parameter order in Graph.runScatterGatherIteration(VertexUpdateFunction, MessagingFunction) to Graph.runScatterGatherIteration(ScatterFunction, GatherFunction) This closes #2184 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/918e5d0c Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/918e5d0c Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/918e5d0c Branch: refs/heads/master Commit: 918e5d0c9dae7a080dcf2505df38af4c655ba6b6 Parents: 6c6b17b Author: Greg Hogan <c...@greghogan.com> Authored: Fri May 6 15:39:38 2016 -0400 Committer: Greg Hogan <c...@greghogan.com> Committed: Thu Jun 30 13:04:32 2016 -0400 ---------------------------------------------------------------------- docs/apis/batch/libs/gelly.md | 78 ++-- .../flink/graph/examples/HITSAlgorithm.java | 112 +++-- .../flink/graph/examples/IncrementalSSSP.java | 40 +- .../examples/SingleSourceShortestPaths.java | 42 +- .../examples/SingleSourceShortestPaths.scala | 47 ++- .../test/examples/IncrementalSSSPITCase.java | 2 +- .../org/apache/flink/graph/scala/Graph.scala | 318 +++++++-------- .../main/java/org/apache/flink/graph/Graph.java | 30 +- .../flink/graph/IterationConfiguration.java | 5 +- .../flink/graph/library/CommunityDetection.java | 46 +-- .../graph/library/ConnectedComponents.java | 54 +-- .../graph/library/GSAConnectedComponents.java | 2 +- .../library/GSASingleSourceShortestPaths.java | 2 +- .../flink/graph/library/LabelPropagation.java | 54 +-- .../apache/flink/graph/library/PageRank.java | 53 ++- .../library/SingleSourceShortestPaths.java | 44 +- .../flink/graph/library/TriangleEnumerator.java | 6 +- .../flink/graph/spargel/GatherFunction.java | 251 ++++++++++++ .../flink/graph/spargel/MessagingFunction.java | 338 --------------- .../flink/graph/spargel/ScatterFunction.java | 338 +++++++++++++++ .../spargel/ScatterGatherConfiguration.java | 44 +- .../graph/spargel/ScatterGatherIteration.java | 407 +++++++++---------- .../graph/spargel/VertexUpdateFunction.java | 251 ------------ .../graph/spargel/SpargelCompilerTest.java | 12 +- .../graph/spargel/SpargelTranslationTest.java | 119 +++--- .../test/CollectionModeSuperstepITCase.java | 35 +- .../test/ScatterGatherConfigurationITCase.java | 227 +++++------ 27 files changed, 1460 insertions(+), 1497 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/918e5d0c/docs/apis/batch/libs/gelly.md ---------------------------------------------------------------------- diff --git a/docs/apis/batch/libs/gelly.md b/docs/apis/batch/libs/gelly.md index aa232fc..f063f09 100644 --- a/docs/apis/batch/libs/gelly.md +++ b/docs/apis/batch/libs/gelly.md @@ -1083,10 +1083,10 @@ final class Compute extends ComputeFunction { ### Scatter-Gather Iterations The scatter-gather model, also known as "signal/collect" model, expresses computation from the perspective of a vertex in the graph. The computation proceeds in synchronized iteration steps, called supersteps. In each superstep, a vertex produces messages for other vertices and updates its value based on the messages it receives. To use scatter-gather iterations in Gelly, the user only needs to define how a vertex behaves in each superstep: -* <strong>Messaging</strong>: corresponds to the scatter phase and produces the messages that a vertex will send to other vertices. -* <strong>Value Update</strong>: corresponds to the gather phase and updates the vertex value using the received messages. +* <strong>Scatter</strong>: produces the messages that a vertex will send to other vertices. +* <strong>Gather</strong>: updates the vertex value using received messages. -Gelly provides methods for scatter-gather iterations. The user only needs to implement two functions, corresponding to the scatter and gather phases. The first function is a `MessagingFunction`, which allows a vertex to send out messages for other vertices. Messages are recieved during the same superstep as they are sent. The second function is `VertexUpdateFunction`, which defines how a vertex will update its value based on the received messages. +Gelly provides methods for scatter-gather iterations. The user only needs to implement two functions, corresponding to the scatter and gather phases. The first function is a `ScatterFunction`, which allows a vertex to send out messages to other vertices. Messages are received during the same superstep as they are sent. The second function is `GatherFunction`, which defines how a vertex will update its value based on the received messages. These functions and the maximum number of iterations to run are given as parameters to Gelly's `runScatterGatherIteration`. This method will execute the scatter-gather iteration on the input Graph and return a new Graph, with updated vertex values. A scatter-gather iteration can be extended with information such as the total number of vertices, the in degree and out degree. @@ -1109,7 +1109,7 @@ int maxIterations = 10; // Execute the scatter-gather iteration Graph<Long, Double, Double> result = graph.runScatterGatherIteration( - new VertexDistanceUpdater(), new MinDistanceMessenger(), maxIterations); + new MinDistanceMessenger(), new VertexDistanceUpdater(), maxIterations); // Extract the vertices as the result DataSet<Vertex<Long, Double>> singleSourceShortestPaths = result.getVertices(); @@ -1118,7 +1118,7 @@ DataSet<Vertex<Long, Double>> singleSourceShortestPaths = result.getVertices(); // - - - UDFs - - - // // scatter: messaging -public static final class MinDistanceMessenger extends MessagingFunction<Long, Double, Double, Double> { +public static final class MinDistanceMessenger extends ScatterFunction<Long, Double, Double, Double> { public void sendMessages(Vertex<Long, Double> vertex) { for (Edge<Long, Double> edge : getEdges()) { @@ -1128,7 +1128,7 @@ public static final class MinDistanceMessenger extends MessagingFunction<Long, D } // gather: vertex update -public static final class VertexDistanceUpdater extends VertexUpdateFunction<Long, Double, Double> { +public static final class VertexDistanceUpdater extends GatherFunction<Long, Double, Double> { public void updateVertex(Vertex<Long, Double> vertex, MessageIterator<Double> inMessages) { Double minDistance = Double.MAX_VALUE; @@ -1157,7 +1157,7 @@ val graph: Graph[Long, Double, Double] = ... val maxIterations = 10 // Execute the scatter-gather iteration -val result = graph.runScatterGatherIteration(new VertexDistanceUpdater, new MinDistanceMessenger, maxIterations) +val result = graph.runScatterGatherIteration(new MinDistanceMessenger, new VertexDistanceUpdater, maxIterations) // Extract the vertices as the result val singleSourceShortestPaths = result.getVertices @@ -1166,7 +1166,7 @@ val singleSourceShortestPaths = result.getVertices // - - - UDFs - - - // // messaging -final class MinDistanceMessenger extends MessagingFunction[Long, Double, Double, Double] { +final class MinDistanceMessenger extends ScatterFunction[Long, Double, Double, Double] { override def sendMessages(vertex: Vertex[Long, Double]) = { for (edge: Edge[Long, Double] <- getEdges) { @@ -1176,7 +1176,7 @@ final class MinDistanceMessenger extends MessagingFunction[Long, Double, Double, } // vertex update -final class VertexDistanceUpdater extends VertexUpdateFunction[Long, Double, Double] { +final class VertexDistanceUpdater extends GatherFunction[Long, Double, Double] { override def updateVertex(vertex: Vertex[Long, Double], inMessages: MessageIterator[Double]) = { var minDistance = Double.MaxValue @@ -1211,9 +1211,9 @@ and can be specified using the `setName()` method. * <strong>Solution set in unmanaged memory</strong>: Defines whether the solution set is kept in managed memory (Flink's internal way of keeping objects in serialized form) or as a simple object map. By default, the solution set runs in managed memory. This property can be set using the `setSolutionSetUnmanagedMemory()` method. * <strong>Aggregators</strong>: Iteration aggregators can be registered using the `registerAggregator()` method. An iteration aggregator combines -all aggregates globally once per superstep and makes them available in the next superstep. Registered aggregators can be accessed inside the user-defined `VertexUpdateFunction` and `MessagingFunction`. +all aggregates globally once per superstep and makes them available in the next superstep. Registered aggregators can be accessed inside the user-defined `ScatterFunction` and `GatherFunction`. -* <strong>Broadcast Variables</strong>: DataSets can be added as [Broadcast Variables]({{site.baseurl}}/apis/batch/index.html#broadcast-variables) to the `VertexUpdateFunction` and `MessagingFunction`, using the `addBroadcastSetForUpdateFunction()` and `addBroadcastSetForMessagingFunction()` methods, respectively. +* <strong>Broadcast Variables</strong>: DataSets can be added as [Broadcast Variables]({{site.baseurl}}/apis/batch/index.html#broadcast-variables) to the `ScatterFunction` and `GatherFunction`, using the `addBroadcastSetForUpdateFunction()` and `addBroadcastSetForMessagingFunction()` methods, respectively. * <strong>Number of Vertices</strong>: Accessing the total number of vertices within the iteration. This property can be set using the `setOptNumVertices()` method. The number of vertices can then be accessed in the vertex update function and in the messaging function using the `getNumberOfVertices()` method. If the option is not set in the configuration, this method will return -1. @@ -1245,10 +1245,12 @@ parameters.registerAggregator("sumAggregator", new LongSumAggregator()); // run the scatter-gather iteration, also passing the configuration parameters Graph<Long, Double, Double> result = graph.runScatterGatherIteration( - new VertexUpdater(), new Messenger(), maxIterations, parameters); + new Messenger(), new VertexUpdater(), maxIterations, parameters); // user-defined functions -public static final class VertexUpdater extends VertexUpdateFunction { +public static final class Messenger extends ScatterFunction {...} + +public static final class VertexUpdater extends GatherFunction { LongSumAggregator aggregator = new LongSumAggregator(); @@ -1272,8 +1274,6 @@ public static final class VertexUpdater extends VertexUpdateFunction { } } -public static final class Messenger extends MessagingFunction {...} - {% endhighlight %} </div> @@ -1294,10 +1294,12 @@ parameters.setParallelism(16) parameters.registerAggregator("sumAggregator", new LongSumAggregator) // run the scatter-gather iteration, also passing the configuration parameters -val result = graph.runScatterGatherIteration(new VertexUpdater, new Messenger, maxIterations, parameters) +val result = graph.runScatterGatherIteration(new Messenger, new VertexUpdater, maxIterations, parameters) // user-defined functions -final class VertexUpdater extends VertexUpdateFunction { +final class Messenger extends ScatterFunction {...} + +final class VertexUpdater extends GatherFunction { var aggregator = new LongSumAggregator @@ -1321,8 +1323,6 @@ final class VertexUpdater extends VertexUpdateFunction { } } -final class Messenger extends MessagingFunction {...} - {% endhighlight %} </div> </div> @@ -1347,20 +1347,20 @@ parameters.setOptDegrees(true); // run the scatter-gather iteration, also passing the configuration parameters Graph<Long, Double, Double> result = graph.runScatterGatherIteration( - new VertexUpdater(), new Messenger(), maxIterations, parameters); + new Messenger(), new VertexUpdater(), maxIterations, parameters); // user-defined functions -public static final class VertexUpdater { +public static final class Messenger extends ScatterFunction { ... - // get the number of vertices - long numVertices = getNumberOfVertices(); + // retrieve the vertex out-degree + outDegree = getOutDegree(); ... } -public static final class Messenger { +public static final class VertexUpdater extends GatherFunction { ... - // retrieve the vertex out-degree - outDegree = getOutDegree(); + // get the number of vertices + long numVertices = getNumberOfVertices(); ... } @@ -1382,20 +1382,20 @@ parameters.setOptNumVertices(true) parameters.setOptDegrees(true) // run the scatter-gather iteration, also passing the configuration parameters -val result = graph.runScatterGatherIteration(new VertexUpdater, new Messenger, maxIterations, parameters) +val result = graph.runScatterGatherIteration(new Messenger, new VertexUpdater, maxIterations, parameters) // user-defined functions -final class VertexUpdater { +final class Messenger extends ScatterFunction { ... - // get the number of vertices - val numVertices = getNumberOfVertices + // retrieve the vertex out-degree + val outDegree = getOutDegree ... } -final class Messenger { +final class VertexUpdater extends GatherFunction { ... - // retrieve the vertex out-degree - val outDegree = getOutDegree + // get the number of vertices + val numVertices = getNumberOfVertices ... } @@ -1419,13 +1419,13 @@ parameters.setDirection(EdgeDirection.IN); // run the scatter-gather iteration, also passing the configuration parameters DataSet<Vertex<Long, HashSet<Long>>> result = graph.runScatterGatherIteration( - new VertexUpdater(), new Messenger(), maxIterations, parameters) + new Messenger(), new VertexUpdater(), maxIterations, parameters) .getVertices(); // user-defined functions -public static final class VertexUpdater {...} +public static final class Messenger extends GatherFunction {...} -public static final class Messenger {...} +public static final class VertexUpdater extends ScatterFunction {...} {% endhighlight %} </div> @@ -1441,13 +1441,13 @@ val parameters = new ScatterGatherConfiguration parameters.setDirection(EdgeDirection.IN) // run the scatter-gather iteration, also passing the configuration parameters -val result = graph.runScatterGatherIteration(new VertexUpdater, new Messenger, maxIterations, parameters) +val result = graph.runScatterGatherIteration(new Messenger, new VertexUpdater, maxIterations, parameters) .getVertices // user-defined functions -final class VertexUpdater {...} +final class Messenger extends ScatterFunction {...} -final class Messenger {...} +final class VertexUpdater extends GatherFunction {...} {% endhighlight %} </div> http://git-wip-us.apache.org/repos/asf/flink/blob/918e5d0c/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/HITSAlgorithm.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/HITSAlgorithm.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/HITSAlgorithm.java index 129d2a6..ff5a2e9 100644 --- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/HITSAlgorithm.java +++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/HITSAlgorithm.java @@ -21,17 +21,16 @@ package org.apache.flink.graph.examples; import org.apache.flink.api.common.aggregators.DoubleSumAggregator; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.DataSet; - import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.graph.Edge; import org.apache.flink.graph.EdgeDirection; import org.apache.flink.graph.Graph; import org.apache.flink.graph.GraphAlgorithm; import org.apache.flink.graph.Vertex; +import org.apache.flink.graph.spargel.GatherFunction; import org.apache.flink.graph.spargel.MessageIterator; -import org.apache.flink.graph.spargel.MessagingFunction; +import org.apache.flink.graph.spargel.ScatterFunction; import org.apache.flink.graph.spargel.ScatterGatherConfiguration; -import org.apache.flink.graph.spargel.VertexUpdateFunction; import org.apache.flink.types.DoubleValue; import org.apache.flink.util.Preconditions; @@ -100,17 +99,55 @@ public class HITSAlgorithm<K, VV, EV> implements GraphAlgorithm<K, VV, EV, DataS parameter.registerAggregator("diffValueSum", new DoubleSumAggregator()); return newGraph - .runScatterGatherIteration(new VertexUpdate<K>(maxIterations, convergeThreshold), - new MessageUpdate<K>(maxIterations), maxIterations, parameter) + .runScatterGatherIteration(new MessageUpdate<K>(maxIterations), + new VertexUpdate<K>(maxIterations, convergeThreshold), maxIterations, parameter) .getVertices(); } /** + * Distributes the value of a vertex among all neighbor vertices and sum all the + * value in every superstep. + */ + private static final class MessageUpdate<K> extends ScatterFunction<K, Tuple2<DoubleValue, DoubleValue>, Double, Boolean> { + private int maxIteration; + + public MessageUpdate(int maxIteration) { + this.maxIteration = maxIteration; + } + + @Override + public void sendMessages(Vertex<K, Tuple2<DoubleValue, DoubleValue>> vertex) { + // in the first iteration, no aggregation to call, init sum with value of vertex + double iterationValueSum = 1.0; + + if (getSuperstepNumber() > 1) { + iterationValueSum = Math.sqrt(((DoubleValue) getPreviousIterationAggregate("updatedValueSum")).getValue()); + } + for (Edge<K, Boolean> edge : getEdges()) { + if (getSuperstepNumber() != maxIteration) { + if (getSuperstepNumber() % 2 == 1) { + if (edge.getValue()) { + sendMessageTo(edge.getTarget(), vertex.getValue().f0.getValue() / iterationValueSum); + } + } else { + if (!edge.getValue()) { + sendMessageTo(edge.getTarget(), vertex.getValue().f1.getValue() / iterationValueSum); + } + } + } else { + if (!edge.getValue()) { + sendMessageTo(edge.getTarget(), iterationValueSum); + } + } + } + } + } + + /** * Function that updates the value of a vertex by summing up the partial * values from all messages and normalize the value. */ - @SuppressWarnings("serial") - public static final class VertexUpdate<K> extends VertexUpdateFunction<K, Tuple2<DoubleValue, DoubleValue>, Double> { + private static final class VertexUpdate<K> extends GatherFunction<K, Tuple2<DoubleValue, DoubleValue>, Double> { private int maxIteration; private double convergeThreshold; private DoubleSumAggregator updatedValueSumAggregator; @@ -162,7 +199,7 @@ public class HITSAlgorithm<K, VV, EV> implements GraphAlgorithm<K, VV, EV, DataS diffValueSum = ((DoubleValue) getPreviousIterationAggregate("diffValueSum")).getValue(); } authoritySumAggregator.aggregate(previousAuthAverage); - + if (diffValueSum > convergeThreshold) { newHubValue.setValue(newHubValue.getValue() / iterationValueSum); newAuthorityValue.setValue(updateValue); @@ -191,77 +228,24 @@ public class HITSAlgorithm<K, VV, EV> implements GraphAlgorithm<K, VV, EV, DataS } } - /** - * Distributes the value of a vertex among all neighbor vertices and sum all the - * value in every superstep. - */ - @SuppressWarnings("serial") - public static final class MessageUpdate<K> extends MessagingFunction<K, Tuple2<DoubleValue, DoubleValue>, Double, Boolean> { - private int maxIteration; - - public MessageUpdate(int maxIteration) { - this.maxIteration = maxIteration; - } - - @Override - public void sendMessages(Vertex<K, Tuple2<DoubleValue, DoubleValue>> vertex) { - - // in the first iteration, no aggregation to call, init sum with value of vertex - double iterationValueSum = 1.0; - - if (getSuperstepNumber() > 1) { - iterationValueSum = Math.sqrt(((DoubleValue) getPreviousIterationAggregate("updatedValueSum")).getValue()); - } - for (Edge<K, Boolean> edge : getEdges()) { - if (getSuperstepNumber() != maxIteration) { - if (getSuperstepNumber() % 2 == 1) { - if (edge.getValue()) { - sendMessageTo(edge.getTarget(), vertex.getValue().f0.getValue() / iterationValueSum); - } - } else { - if (!edge.getValue()) { - sendMessageTo(edge.getTarget(), vertex.getValue().f1.getValue() / iterationValueSum); - } - } - } else { - if (!edge.getValue()) { - sendMessageTo(edge.getTarget(), iterationValueSum); - } - } - } - } - } - - public static class VertexInitMapper<K, VV> implements MapFunction<Vertex<K, VV>, Tuple2<DoubleValue, DoubleValue>> { - - private static final long serialVersionUID = 1L; - + private static class VertexInitMapper<K, VV> implements MapFunction<Vertex<K, VV>, Tuple2<DoubleValue, DoubleValue>> { private Tuple2<DoubleValue, DoubleValue> initVertexValue = new Tuple2<>(new DoubleValue(1.0), new DoubleValue(1.0)); public Tuple2<DoubleValue, DoubleValue> map(Vertex<K, VV> value) { - //init hub and authority value of each vertex return initVertexValue; } } - public static class AuthorityEdgeMapper<K, EV> implements MapFunction<Edge<K, EV>, Boolean> { - - private static final long serialVersionUID = 1L; - + private static class AuthorityEdgeMapper<K, EV> implements MapFunction<Edge<K, EV>, Boolean> { public Boolean map(Edge<K, EV> edge) { - // mark edge as true for authority updating return true; } } - public static class HubEdgeMapper<K, EV> implements MapFunction<Edge<K, EV>, Boolean> { - - private static final long serialVersionUID = 1L; - + private static class HubEdgeMapper<K, EV> implements MapFunction<Edge<K, EV>, Boolean> { public Boolean map(Edge<K, EV> edge) { - // mark edge as false for hub updating return false; } http://git-wip-us.apache.org/repos/asf/flink/blob/918e5d0c/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/IncrementalSSSP.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/IncrementalSSSP.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/IncrementalSSSP.java index 26e419f..631384c 100644 --- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/IncrementalSSSP.java +++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/IncrementalSSSP.java @@ -27,10 +27,10 @@ import org.apache.flink.graph.EdgeDirection; import org.apache.flink.graph.Graph; import org.apache.flink.graph.Vertex; import org.apache.flink.graph.examples.data.IncrementalSSSPData; +import org.apache.flink.graph.spargel.GatherFunction; import org.apache.flink.graph.spargel.MessageIterator; -import org.apache.flink.graph.spargel.MessagingFunction; +import org.apache.flink.graph.spargel.ScatterFunction; import org.apache.flink.graph.spargel.ScatterGatherConfiguration; -import org.apache.flink.graph.spargel.VertexUpdateFunction; /** * This example illustrates how to @@ -97,8 +97,8 @@ public class IncrementalSSSP implements ProgramDescription { parameters.setOptDegrees(true); // run the scatter-gather iteration to propagate info - Graph<Long, Double, Double> result = ssspGraph.runScatterGatherIteration(new VertexDistanceUpdater(), - new InvalidateMessenger(edgeToBeRemoved), maxIterations, parameters); + Graph<Long, Double, Double> result = ssspGraph.runScatterGatherIteration(new InvalidateMessenger(edgeToBeRemoved), + new VertexDistanceUpdater(), maxIterations, parameters); DataSet<Vertex<Long, Double>> resultedVertices = result.getVertices(); @@ -147,22 +147,7 @@ public class IncrementalSSSP implements ProgramDescription { }).count() > 0; } - public static final class VertexDistanceUpdater extends VertexUpdateFunction<Long, Double, Double> { - - @Override - public void updateVertex(Vertex<Long, Double> vertex, MessageIterator<Double> inMessages) throws Exception { - if (inMessages.hasNext()) { - Long outDegree = getOutDegree() - 1; - // check if the vertex has another SP-Edge - if (outDegree <= 0) { - // set own value to infinity - setNewVertexValue(Double.MAX_VALUE); - } - } - } - } - - public static final class InvalidateMessenger extends MessagingFunction<Long, Double, Double, Double> { + public static final class InvalidateMessenger extends ScatterFunction<Long, Double, Double, Double> { private Edge<Long, Double> edgeToBeRemoved; @@ -190,6 +175,21 @@ public class IncrementalSSSP implements ProgramDescription { } } + public static final class VertexDistanceUpdater extends GatherFunction<Long, Double, Double> { + + @Override + public void updateVertex(Vertex<Long, Double> vertex, MessageIterator<Double> inMessages) throws Exception { + if (inMessages.hasNext()) { + Long outDegree = getOutDegree() - 1; + // check if the vertex has another SP-Edge + if (outDegree <= 0) { + // set own value to infinity + setNewVertexValue(Double.MAX_VALUE); + } + } + } + } + // ****************************************************************************************************************** // UTIL METHODS // ****************************************************************************************************************** http://git-wip-us.apache.org/repos/asf/flink/blob/918e5d0c/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/SingleSourceShortestPaths.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/SingleSourceShortestPaths.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/SingleSourceShortestPaths.java index c9abf02..68d20e0 100644 --- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/SingleSourceShortestPaths.java +++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/SingleSourceShortestPaths.java @@ -26,9 +26,9 @@ import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.graph.Edge; import org.apache.flink.graph.Graph; import org.apache.flink.graph.Vertex; +import org.apache.flink.graph.spargel.GatherFunction; import org.apache.flink.graph.spargel.MessageIterator; -import org.apache.flink.graph.spargel.MessagingFunction; -import org.apache.flink.graph.spargel.VertexUpdateFunction; +import org.apache.flink.graph.spargel.ScatterFunction; import org.apache.flink.graph.utils.Tuple3ToEdgeMap; /** @@ -62,7 +62,7 @@ public class SingleSourceShortestPaths implements ProgramDescription { // Execute the scatter-gather iteration Graph<Long, Double, Double> result = graph.runScatterGatherIteration( - new VertexDistanceUpdater(), new MinDistanceMessenger(), maxIterations); + new MinDistanceMessenger(), new VertexDistanceUpdater(), maxIterations); // Extract the vertices as the result DataSet<Vertex<Long, Double>> singleSourceShortestPaths = result.getVertices(); @@ -103,11 +103,28 @@ public class SingleSourceShortestPaths implements ProgramDescription { } /** + * Distributes the minimum distance associated with a given vertex among all + * the target vertices summed up with the edge's value. + */ + @SuppressWarnings("serial") + private static final class MinDistanceMessenger extends ScatterFunction<Long, Double, Double, Double> { + + @Override + public void sendMessages(Vertex<Long, Double> vertex) { + if (vertex.getValue() < Double.POSITIVE_INFINITY) { + for (Edge<Long, Double> edge : getEdges()) { + sendMessageTo(edge.getTarget(), vertex.getValue() + edge.getValue()); + } + } + } + } + + /** * Function that updates the value of a vertex by picking the minimum * distance from all incoming messages. */ @SuppressWarnings("serial") - public static final class VertexDistanceUpdater extends VertexUpdateFunction<Long, Double, Double> { + private static final class VertexDistanceUpdater extends GatherFunction<Long, Double, Double> { @Override public void updateVertex(Vertex<Long, Double> vertex, MessageIterator<Double> inMessages) { @@ -126,23 +143,6 @@ public class SingleSourceShortestPaths implements ProgramDescription { } } - /** - * Distributes the minimum distance associated with a given vertex among all - * the target vertices summed up with the edge's value. - */ - @SuppressWarnings("serial") - public static final class MinDistanceMessenger extends MessagingFunction<Long, Double, Double, Double> { - - @Override - public void sendMessages(Vertex<Long, Double> vertex) { - if (vertex.getValue() < Double.POSITIVE_INFINITY) { - for (Edge<Long, Double> edge : getEdges()) { - sendMessageTo(edge.getTarget(), vertex.getValue() + edge.getValue()); - } - } - } - } - // ****************************************************************************************************************** // UTIL METHODS // ****************************************************************************************************************** http://git-wip-us.apache.org/repos/asf/flink/blob/918e5d0c/flink-libraries/flink-gelly-examples/src/main/scala/org/apache/flink/graph/scala/examples/SingleSourceShortestPaths.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly-examples/src/main/scala/org/apache/flink/graph/scala/examples/SingleSourceShortestPaths.scala b/flink-libraries/flink-gelly-examples/src/main/scala/org/apache/flink/graph/scala/examples/SingleSourceShortestPaths.scala index 4f84bb0..2d623e7 100644 --- a/flink-libraries/flink-gelly-examples/src/main/scala/org/apache/flink/graph/scala/examples/SingleSourceShortestPaths.scala +++ b/flink-libraries/flink-gelly-examples/src/main/scala/org/apache/flink/graph/scala/examples/SingleSourceShortestPaths.scala @@ -22,11 +22,10 @@ import org.apache.flink.api.scala._ import org.apache.flink.graph.scala._ import org.apache.flink.graph.Edge import org.apache.flink.api.common.functions.MapFunction -import org.apache.flink.graph.spargel.VertexUpdateFunction -import org.apache.flink.graph.spargel.MessageIterator +import org.apache.flink.graph.spargel.{MessageIterator, ScatterFunction, GatherFunction} import org.apache.flink.graph.Vertex -import org.apache.flink.graph.spargel.MessagingFunction import org.apache.flink.graph.examples.data.SingleSourceShortestPathsData + import scala.collection.JavaConversions._ import org.apache.flink.graph.scala.utils.Tuple3ToEdgeMap @@ -55,8 +54,8 @@ object SingleSourceShortestPaths { val graph = Graph.fromDataSet[Long, Double, Double](edges, new InitVertices(srcVertexId), env) // Execute the scatter-gather iteration - val result = graph.runScatterGatherIteration(new VertexDistanceUpdater, - new MinDistanceMessenger, maxIterations) + val result = graph.runScatterGatherIteration(new MinDistanceMessenger, + new VertexDistanceUpdater, maxIterations) // Extract the vertices as the result val singleSourceShortestPaths = result.getVertices @@ -86,10 +85,26 @@ object SingleSourceShortestPaths { } /** - * Function that updates the value of a vertex by picking the minimum - * distance from all incoming messages. + * Distributes the minimum distance associated with a given vertex among all + * the target vertices summed up with the edge's value. */ - private final class VertexDistanceUpdater extends VertexUpdateFunction[Long, Double, Double] { + private final class MinDistanceMessenger extends + ScatterFunction[Long, Double, Double, Double] { + + override def sendMessages(vertex: Vertex[Long, Double]) { + if (vertex.getValue < Double.PositiveInfinity) { + for (edge: Edge[Long, Double] <- getEdges) { + sendMessageTo(edge.getTarget, vertex.getValue + edge.getValue) + } + } + } + } + + /** + * Function that updates the value of a vertex by picking the minimum + * distance from all incoming messages. + */ + private final class VertexDistanceUpdater extends GatherFunction[Long, Double, Double] { override def updateVertex(vertex: Vertex[Long, Double], inMessages: MessageIterator[Double]) { var minDistance = Double.MaxValue @@ -105,22 +120,6 @@ object SingleSourceShortestPaths { } } - /** - * Distributes the minimum distance associated with a given vertex among all - * the target vertices summed up with the edge's value. - */ - private final class MinDistanceMessenger extends - MessagingFunction[Long, Double, Double, Double] { - - override def sendMessages(vertex: Vertex[Long, Double]) { - if (vertex.getValue < Double.PositiveInfinity) { - for (edge: Edge[Long, Double] <- getEdges) { - sendMessageTo(edge.getTarget, vertex.getValue + edge.getValue) - } - } - } - } - // **************************************************************************** // UTIL METHODS // **************************************************************************** http://git-wip-us.apache.org/repos/asf/flink/blob/918e5d0c/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/test/examples/IncrementalSSSPITCase.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/test/examples/IncrementalSSSPITCase.java b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/test/examples/IncrementalSSSPITCase.java index d27dcd8..24b8cf1 100644 --- a/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/test/examples/IncrementalSSSPITCase.java +++ b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/test/examples/IncrementalSSSPITCase.java @@ -111,8 +111,8 @@ public class IncrementalSSSPITCase extends MultipleProgramsTestBase { // run the scatter gather iteration to propagate info Graph<Long, Double, Double> result = ssspGraph.runScatterGatherIteration( - new IncrementalSSSP.VertexDistanceUpdater(), new IncrementalSSSP.InvalidateMessenger(edgeToBeRemoved), + new IncrementalSSSP.VertexDistanceUpdater(), IncrementalSSSPData.NUM_VERTICES, parameters); DataSet<Vertex<Long, Double>> resultedVertices = result.getVertices(); http://git-wip-us.apache.org/repos/asf/flink/blob/918e5d0c/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/Graph.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/Graph.scala b/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/Graph.scala index f7e13ba..4dd9d12 100644 --- a/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/Graph.scala +++ b/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/Graph.scala @@ -24,9 +24,9 @@ import org.apache.flink.api.java.{tuple => jtuple} import org.apache.flink.api.scala._ import org.apache.flink.graph._ import org.apache.flink.graph.asm.translate.TranslateFunction -import org.apache.flink.graph.gsa.{ApplyFunction, GSAConfiguration, GatherFunction, SumFunction} +import org.apache.flink.graph.gsa.{ApplyFunction, GSAConfiguration, SumFunction, GatherFunction => GSAGatherFunction} import org.apache.flink.graph.pregel.{ComputeFunction, MessageCombiner, VertexCentricConfiguration} -import org.apache.flink.graph.spargel.{MessagingFunction, ScatterGatherConfiguration, VertexUpdateFunction} +import org.apache.flink.graph.spargel.{ScatterFunction, ScatterGatherConfiguration, GatherFunction => SpargelGatherFunction} import org.apache.flink.graph.validation.GraphValidator import org.apache.flink.types.{LongValue, NullValue} import org.apache.flink.util.Preconditions @@ -38,8 +38,8 @@ import _root_.scala.reflect.ClassTag object Graph { /** - * Creates a Graph from a DataSet of vertices and a DataSet of edges. - */ + * Creates a Graph from a DataSet of vertices and a DataSet of edges. + */ def fromDataSet[K: TypeInformation : ClassTag, VV: TypeInformation : ClassTag, EV: TypeInformation : ClassTag](vertices: DataSet[Vertex[K, VV]], edges: DataSet[Edge[K, EV]], env: ExecutionEnvironment): Graph[K, VV, EV] = { @@ -47,19 +47,19 @@ object Graph { } /** - * Creates a Graph from a DataSet of edges. - * Vertices are created automatically and their values are set to NullValue. - */ + * Creates a Graph from a DataSet of edges. + * Vertices are created automatically and their values are set to NullValue. + */ def fromDataSet[K: TypeInformation : ClassTag, EV: TypeInformation : ClassTag] (edges: DataSet[Edge[K, EV]], env: ExecutionEnvironment): Graph[K, NullValue, EV] = { wrapGraph(jg.Graph.fromDataSet[K, EV](edges.javaSet, env.getJavaEnv)) } /** - * Creates a graph from a DataSet of edges. - * Vertices are created automatically and their values are set by applying the provided - * vertexValueInitializer map function to the vertex ids. - */ + * Creates a graph from a DataSet of edges. + * Vertices are created automatically and their values are set by applying the provided + * vertexValueInitializer map function to the vertex ids. + */ def fromDataSet[K: TypeInformation : ClassTag, VV: TypeInformation : ClassTag, EV: TypeInformation : ClassTag](edges: DataSet[Edge[K, EV]], vertexValueInitializer: MapFunction[K, VV], env: ExecutionEnvironment): Graph[K, VV, EV] = { @@ -68,8 +68,8 @@ object Graph { } /** - * Creates a Graph from a Seq of vertices and a Seq of edges. - */ + * Creates a Graph from a Seq of vertices and a Seq of edges. + */ def fromCollection[K: TypeInformation : ClassTag, VV: TypeInformation : ClassTag, EV: TypeInformation : ClassTag](vertices: Seq[Vertex[K, VV]], edges: Seq[Edge[K, EV]], env: ExecutionEnvironment): Graph[K, VV, EV] = { @@ -78,19 +78,19 @@ object Graph { } /** - * Creates a Graph from a Seq of edges. - * Vertices are created automatically and their values are set to NullValue. - */ + * Creates a Graph from a Seq of edges. + * Vertices are created automatically and their values are set to NullValue. + */ def fromCollection[K: TypeInformation : ClassTag, EV: TypeInformation : ClassTag] (edges: Seq[Edge[K, EV]], env: ExecutionEnvironment): Graph[K, NullValue, EV] = { wrapGraph(jg.Graph.fromCollection[K, EV](edges.asJavaCollection, env.getJavaEnv)) } /** - * Creates a graph from a Seq of edges. - * Vertices are created automatically and their values are set by applying the provided - * vertexValueInitializer map function to the vertex ids. - */ + * Creates a graph from a Seq of edges. + * Vertices are created automatically and their values are set by applying the provided + * vertexValueInitializer map function to the vertex ids. + */ def fromCollection[K: TypeInformation : ClassTag, VV: TypeInformation : ClassTag, EV: TypeInformation : ClassTag](edges: Seq[Edge[K, EV]], vertexValueInitializer: MapFunction[K, VV], env: ExecutionEnvironment): Graph[K, VV, EV] = { @@ -105,7 +105,7 @@ object Graph { * The first field of the Tuple3 object for edges will become the source ID, * the second field will become the target ID, and the third field will become * the edge value. - */ + */ def fromTupleDataSet[K: TypeInformation : ClassTag, VV: TypeInformation : ClassTag, EV: TypeInformation : ClassTag](vertices: DataSet[(K, VV)], edges: DataSet[(K, K, EV)], env: ExecutionEnvironment): Graph[K, VV, EV] = { @@ -116,12 +116,12 @@ object Graph { } /** - * Creates a Graph from a DataSet of Tuples representing the edges. - * The first field of the Tuple3 object for edges will become the source ID, - * the second field will become the target ID, and the third field will become - * the edge value. - * Vertices are created automatically and their values are set to NullValue. - */ + * Creates a Graph from a DataSet of Tuples representing the edges. + * The first field of the Tuple3 object for edges will become the source ID, + * the second field will become the target ID, and the third field will become + * the edge value. + * Vertices are created automatically and their values are set to NullValue. + */ def fromTupleDataSet[K: TypeInformation : ClassTag, EV: TypeInformation : ClassTag] (edges: DataSet[(K, K, EV)], env: ExecutionEnvironment): Graph[K, NullValue, EV] = { val javaTupleEdges = edges.map(v => new jtuple.Tuple3(v._1, v._2, v._3)).javaSet @@ -129,13 +129,13 @@ object Graph { } /** - * Creates a Graph from a DataSet of Tuples representing the edges. - * The first field of the Tuple3 object for edges will become the source ID, - * the second field will become the target ID, and the third field will become - * the edge value. - * Vertices are created automatically and their values are set by applying the provided - * vertexValueInitializer map function to the vertex ids. - */ + * Creates a Graph from a DataSet of Tuples representing the edges. + * The first field of the Tuple3 object for edges will become the source ID, + * the second field will become the target ID, and the third field will become + * the edge value. + * Vertices are created automatically and their values are set by applying the provided + * vertexValueInitializer map function to the vertex ids. + */ def fromTupleDataSet[K: TypeInformation : ClassTag, VV: TypeInformation : ClassTag, EV: TypeInformation : ClassTag](edges: DataSet[(K, K, EV)], vertexValueInitializer: MapFunction[K, VV], env: ExecutionEnvironment): Graph[K, VV, EV] = { @@ -144,12 +144,12 @@ object Graph { env.getJavaEnv)) } - /** - * Creates a Graph from a DataSet of Tuple2's representing the edges. - * The first field of the Tuple2 object for edges will become the source ID, - * the second field will become the target ID. The edge value will be set to NullValue. - * Vertices are created automatically and their values are set to NullValue. - */ + /** + * Creates a Graph from a DataSet of Tuple2's representing the edges. + * The first field of the Tuple2 object for edges will become the source ID, + * the second field will become the target ID. The edge value will be set to NullValue. + * Vertices are created automatically and their values are set to NullValue. + */ def fromTuple2DataSet[K: TypeInformation : ClassTag](edges: DataSet[(K, K)], env: ExecutionEnvironment): Graph[K, NullValue, NullValue] = { val javaTupleEdges = edges.map(v => new jtuple.Tuple2(v._1, v._2)).javaSet @@ -157,12 +157,12 @@ object Graph { } /** - * Creates a Graph from a DataSet of Tuple2's representing the edges. - * The first field of the Tuple2 object for edges will become the source ID, - * the second field will become the target ID. The edge value will be set to NullValue. - * Vertices are created automatically and their values are set by applying the provided - * vertexValueInitializer map function to the vertex IDs. - */ + * Creates a Graph from a DataSet of Tuple2's representing the edges. + * The first field of the Tuple2 object for edges will become the source ID, + * the second field will become the target ID. The edge value will be set to NullValue. + * Vertices are created automatically and their values are set by applying the provided + * vertexValueInitializer map function to the vertex IDs. + */ def fromTuple2DataSet[K: TypeInformation : ClassTag, VV: TypeInformation : ClassTag] (edges: DataSet[(K, K)], vertexValueInitializer: MapFunction[K, VV], env: ExecutionEnvironment): Graph[K, VV, NullValue] = { @@ -172,52 +172,52 @@ object Graph { } /** Creates a Graph from a CSV file of edges. - * - * The edge value is read from the CSV file if [[EV]] is not of type [[NullValue]]. Otherwise the - * edge value is set to [[NullValue]]. - * - * If the vertex value type [[VV]] is specified (unequal [[NullValue]]), then the vertex values - * are read from the file specified by pathVertices. If the path has not been specified then the - * vertexValueInitializer is used to initialize the vertex values of the vertices extracted from - * the set of edges. If the vertexValueInitializer has not been set either, then the method - * fails. - * - * @param env The Execution Environment. - * @param pathEdges The file path containing the edges. - * @param pathVertices The file path containing the vertices. - * @param lineDelimiterVertices The string that separates lines in the vertices file. It defaults - * to newline. - * @param fieldDelimiterVertices The string that separates vertex Ids from vertex values in the - * vertices file. - * @param quoteCharacterVertices The character to use for quoted String parsing in the vertices - * file. Disabled by default. - * @param ignoreFirstLineVertices Whether the first line in the vertices file should be ignored. - * @param ignoreCommentsVertices Lines that start with the given String in the vertices file - * are ignored, disabled by default. - * @param lenientVertices Whether the parser should silently ignore malformed lines in the - * vertices file. - * @param includedFieldsVertices The fields in the vertices file that should be read. By default - * all fields are read. - * @param lineDelimiterEdges The string that separates lines in the edges file. It defaults to - * newline. - * @param fieldDelimiterEdges The string that separates fields in the edges file. - * @param quoteCharacterEdges The character to use for quoted String parsing in the edges file. - * Disabled by default. - * @param ignoreFirstLineEdges Whether the first line in the vertices file should be ignored. - * @param ignoreCommentsEdges Lines that start with the given String in the edges file are - * ignored, disabled by default. - * @param lenientEdges Whether the parser should silently ignore malformed lines in the edges - * file. - * @param includedFieldsEdges The fields in the edges file that should be read. By default all - * fields are read. - * @param vertexValueInitializer If no vertex values are provided, this mapper can be used to - * initialize them, by applying a map transformation on the vertex - * IDs. - * @tparam K Vertex key type - * @tparam VV Vertex value type - * @tparam EV Edge value type - * @return Graph with vertices and edges read from the given files. - */ + * + * The edge value is read from the CSV file if [[EV]] is not of type [[NullValue]]. Otherwise the + * edge value is set to [[NullValue]]. + * + * If the vertex value type [[VV]] is specified (unequal [[NullValue]]), then the vertex values + * are read from the file specified by pathVertices. If the path has not been specified then the + * vertexValueInitializer is used to initialize the vertex values of the vertices extracted from + * the set of edges. If the vertexValueInitializer has not been set either, then the method + * fails. + * + * @param env The Execution Environment. + * @param pathEdges The file path containing the edges. + * @param pathVertices The file path containing the vertices. + * @param lineDelimiterVertices The string that separates lines in the vertices file. It defaults + * to newline. + * @param fieldDelimiterVertices The string that separates vertex Ids from vertex values in the + * vertices file. + * @param quoteCharacterVertices The character to use for quoted String parsing in the vertices + * file. Disabled by default. + * @param ignoreFirstLineVertices Whether the first line in the vertices file should be ignored. + * @param ignoreCommentsVertices Lines that start with the given String in the vertices file + * are ignored, disabled by default. + * @param lenientVertices Whether the parser should silently ignore malformed lines in the + * vertices file. + * @param includedFieldsVertices The fields in the vertices file that should be read. By default + * all fields are read. + * @param lineDelimiterEdges The string that separates lines in the edges file. It defaults to + * newline. + * @param fieldDelimiterEdges The string that separates fields in the edges file. + * @param quoteCharacterEdges The character to use for quoted String parsing in the edges file. + * Disabled by default. + * @param ignoreFirstLineEdges Whether the first line in the vertices file should be ignored. + * @param ignoreCommentsEdges Lines that start with the given String in the edges file are + * ignored, disabled by default. + * @param lenientEdges Whether the parser should silently ignore malformed lines in the edges + * file. + * @param includedFieldsEdges The fields in the edges file that should be read. By default all + * fields are read. + * @param vertexValueInitializer If no vertex values are provided, this mapper can be used to + * initialize them, by applying a map transformation on the vertex + * IDs. + * @tparam K Vertex key type + * @tparam VV Vertex value type + * @tparam EV Edge value type + * @return Graph with vertices and edges read from the given files. + */ // scalastyle:off // This method exceeds the max allowed number of parameters --> def fromCsvReader[ @@ -295,6 +295,7 @@ object Graph { /** * Represents a graph consisting of {@link Edge edges} and {@link Vertex vertices}. + * * @param jgraph the underlying java api Graph. * @tparam K the key type for vertex and edge identifiers * @tparam VV the value type for vertices @@ -341,9 +342,9 @@ TypeInformation : ClassTag](jgraph: jg.Graph[K, VV, EV]) { } /** - * @return a DataSet of Triplets, - * consisting of (srcVertexId, trgVertexId, srcVertexValue, trgVertexValue, edgeValue) - */ + * @return a DataSet of Triplets, + * consisting of (srcVertexId, trgVertexId, srcVertexValue, trgVertexValue, edgeValue) + */ def getTriplets(): DataSet[Triplet[K, VV, EV]] = { wrap(jgraph.getTriplets()) } @@ -418,11 +419,11 @@ TypeInformation : ClassTag](jgraph: jg.Graph[K, VV, EV]) { } /** - * Translate vertex and edge IDs using the given function. - * - * @param fun implements conversion from K to NEW - * @return graph with translated vertex and edge IDs - */ + * Translate vertex and edge IDs using the given function. + * + * @param fun implements conversion from K to NEW + * @return graph with translated vertex and edge IDs + */ def translateGraphIds[NEW: TypeInformation : ClassTag](fun: (K, NEW) => NEW): Graph[NEW, VV, EV] = { val translator: TranslateFunction[K, NEW] = new TranslateFunction[K, NEW] { @@ -446,11 +447,11 @@ TypeInformation : ClassTag](jgraph: jg.Graph[K, VV, EV]) { } /** - * Translate vertex values using the given function. - * - * @param fun implements conversion from VV to NEW - * @return graph with translated vertex values - */ + * Translate vertex values using the given function. + * + * @param fun implements conversion from VV to NEW + * @return graph with translated vertex values + */ def translateVertexValues[NEW: TypeInformation : ClassTag](fun: (VV, NEW) => NEW): Graph[K, NEW, EV] = { val translator: TranslateFunction[VV, NEW] = new TranslateFunction[VV, NEW] { @@ -474,11 +475,11 @@ TypeInformation : ClassTag](jgraph: jg.Graph[K, VV, EV]) { } /** - * Translate edge values using the given function. - * - * @param fun implements conversion from EV to NEW - * @return graph with translated edge values - */ + * Translate edge values using the given function. + * + * @param fun implements conversion from EV to NEW + * @return graph with translated edge values + */ def translateEdgeValues[NEW: TypeInformation : ClassTag](fun: (EV, NEW) => NEW): Graph[K, VV, NEW] = { val translator: TranslateFunction[EV, NEW] = new TranslateFunction[EV, NEW] { @@ -503,9 +504,8 @@ TypeInformation : ClassTag](jgraph: jg.Graph[K, VV, EV]) { * of the matched Tuple2 from the input DataSet. * @return a new Graph, where the vertex values have been updated according to the * result of the vertexJoinFunction. - * * @tparam T the type of the second field of the input Tuple2 DataSet. - */ + */ def joinWithVertices[T: TypeInformation](inputDataSet: DataSet[(K, T)], vertexJoinFunction: VertexJoinFunction[VV, T]): Graph[K, VV, EV] = { val javaTupleSet = inputDataSet.map(scalatuple => new jtuple.Tuple2(scalatuple._1, @@ -526,9 +526,8 @@ TypeInformation : ClassTag](jgraph: jg.Graph[K, VV, EV]) { * of the matched Tuple2 from the input DataSet. * @return a new Graph, where the vertex values have been updated according to the * result of the vertexJoinFunction. - * * @tparam T the type of the second field of the input Tuple2 DataSet. - */ + */ def joinWithVertices[T: TypeInformation](inputDataSet: DataSet[(K, T)], fun: (VV, T) => VV): Graph[K, VV, EV] = { val newVertexJoin = new VertexJoinFunction[VV, T]() { @@ -554,11 +553,10 @@ TypeInformation : ClassTag](jgraph: jg.Graph[K, VV, EV]) { * @param edgeJoinFunction the transformation function to apply. * The first parameter is the current edge value and the second parameter is the value * of the matched Tuple3 from the input DataSet. - * * @tparam T the type of the third field of the input Tuple3 DataSet. * @return a new Graph, where the edge values have been updated according to the * result of the edgeJoinFunction. - */ + */ def joinWithEdges[T: TypeInformation](inputDataSet: DataSet[(K, K, T)], edgeJoinFunction: EdgeJoinFunction[EV, T]): Graph[K, VV, EV] = { val javaTupleSet = inputDataSet.map(scalatuple => new jtuple.Tuple3(scalatuple._1, @@ -577,11 +575,10 @@ TypeInformation : ClassTag](jgraph: jg.Graph[K, VV, EV]) { * @param fun the transformation function to apply. * The first parameter is the current edge value and the second parameter is the value * of the matched Tuple3 from the input DataSet. - * * @tparam T the type of the third field of the input Tuple3 DataSet. * @return a new Graph, where the edge values have been updated according to the * result of the edgeJoinFunction. - */ + */ def joinWithEdges[T: TypeInformation](inputDataSet: DataSet[(K, K, T)], fun: (EV, T) => EV): Graph[K, VV, EV] = { val newEdgeJoin = new EdgeJoinFunction[EV, T]() { @@ -611,7 +608,7 @@ TypeInformation : ClassTag](jgraph: jg.Graph[K, VV, EV]) { * @tparam T the type of the second field of the input Tuple2 DataSet. * @return a new Graph, where the edge values have been updated according to the * result of the edgeJoinFunction. - */ + */ def joinWithEdgesOnSource[T: TypeInformation](inputDataSet: DataSet[(K, T)], edgeJoinFunction: EdgeJoinFunction[EV, T]): Graph[K, VV, EV] = { val javaTupleSet = inputDataSet.map(scalatuple => new jtuple.Tuple2(scalatuple._1, @@ -634,7 +631,7 @@ TypeInformation : ClassTag](jgraph: jg.Graph[K, VV, EV]) { * @tparam T the type of the second field of the input Tuple2 DataSet. * @return a new Graph, where the edge values have been updated according to the * result of the edgeJoinFunction. - */ + */ def joinWithEdgesOnSource[T: TypeInformation](inputDataSet: DataSet[(K, T)], fun: (EV, T) => EV): Graph[K, VV, EV] = { val newEdgeJoin = new EdgeJoinFunction[EV, T]() { @@ -664,7 +661,7 @@ TypeInformation : ClassTag](jgraph: jg.Graph[K, VV, EV]) { * @tparam T the type of the second field of the input Tuple2 DataSet. * @return a new Graph, where the edge values have been updated according to the * result of the edgeJoinFunction. - */ + */ def joinWithEdgesOnTarget[T: TypeInformation](inputDataSet: DataSet[(K, T)], edgeJoinFunction: EdgeJoinFunction[EV, T]): Graph[K, VV, EV] = { val javaTupleSet = inputDataSet.map(scalatuple => new jtuple.Tuple2(scalatuple._1, @@ -687,7 +684,7 @@ TypeInformation : ClassTag](jgraph: jg.Graph[K, VV, EV]) { * @tparam T the type of the second field of the input Tuple2 DataSet. * @return a new Graph, where the edge values have been updated according to the * result of the edgeJoinFunction. - */ + */ def joinWithEdgesOnTarget[T: TypeInformation](inputDataSet: DataSet[(K, T)], fun: (EV, T) => EV): Graph[K, VV, EV] = { val newEdgeJoin = new EdgeJoinFunction[EV, T]() { @@ -945,30 +942,30 @@ TypeInformation : ClassTag](jgraph: jg.Graph[K, VV, EV]) { } /** - * Adds the list of vertices, passed as input, to the graph. - * If the vertices already exist in the graph, they will not be added once more. - * - * @param vertices the list of vertices to add - * @return the new graph containing the existing and newly added vertices - */ + * Adds the list of vertices, passed as input, to the graph. + * If the vertices already exist in the graph, they will not be added once more. + * + * @param vertices the list of vertices to add + * @return the new graph containing the existing and newly added vertices + */ def addVertices(vertices: List[Vertex[K, VV]]): Graph[K, VV, EV] = { wrapGraph(jgraph.addVertices(vertices.asJava)) } /** - * Adds the given list edges to the graph. - * - * When adding an edge for a non-existing set of vertices, - * the edge is considered invalid and ignored. - * - * @param edges the data set of edges to be added - * @return a new graph containing the existing edges plus the newly added edges. - */ + * Adds the given list edges to the graph. + * + * When adding an edge for a non-existing set of vertices, + * the edge is considered invalid and ignored. + * + * @param edges the data set of edges to be added + * @return a new graph containing the existing edges plus the newly added edges. + */ def addEdges(edges: List[Edge[K, EV]]): Graph[K, VV, EV] = { wrapGraph(jgraph.addEdges(edges.asJava)) } - /** + /** * Adds the given edge to the graph. If the source and target vertices do * not exist in the graph, they will also be added. * @@ -993,7 +990,7 @@ TypeInformation : ClassTag](jgraph: jg.Graph[K, VV, EV]) { wrapGraph(jgraph.removeVertex(vertex)) } - /** + /** * Removes the given vertex and its edges from the graph. * * @param vertices list of vertices to remove @@ -1037,12 +1034,13 @@ TypeInformation : ClassTag](jgraph: jg.Graph[K, VV, EV]) { } /** - * Performs Difference on the vertex and edge sets of the input graphs - * removes common vertices and edges. If a source/target vertex is removed, - * its corresponding edge will also be removed - * @param graph the graph to perform difference with - * @return a new graph where the common vertices and edges have been removed - */ + * Performs Difference on the vertex and edge sets of the input graphs + * removes common vertices and edges. If a source/target vertex is removed, + * its corresponding edge will also be removed + * + * @param graph the graph to perform difference with + * @return a new graph where the common vertices and edges have been removed + */ def difference(graph: Graph[K, VV, EV]) = { wrapGraph(jgraph.difference(graph.getWrappedGraph)) } @@ -1135,36 +1133,34 @@ TypeInformation : ClassTag](jgraph: jg.Graph[K, VV, EV]) { * Runs a scatter-gather iteration on the graph. * No configuration options are provided. * - * @param vertexUpdateFunction the vertex update function - * @param messagingFunction the messaging function + * @param scatterFunction the scatter function + * @param gatherFunction the gather function * @param maxIterations maximum number of iterations to perform - * * @return the updated Graph after the scatter-gather iteration has converged or * after maximumNumberOfIterations. */ - def runScatterGatherIteration[M](vertexUpdateFunction: VertexUpdateFunction[K, VV, M], - messagingFunction: MessagingFunction[K, VV, M, EV], + def runScatterGatherIteration[M](scatterFunction: ScatterFunction[K, VV, M, EV], + gatherFunction: SpargelGatherFunction[K, VV, M], maxIterations: Int): Graph[K, VV, EV] = { - wrapGraph(jgraph.runScatterGatherIteration(vertexUpdateFunction, messagingFunction, + wrapGraph(jgraph.runScatterGatherIteration(scatterFunction, gatherFunction, maxIterations)) } /** * Runs a scatter-gather iteration on the graph with configuration options. * - * @param vertexUpdateFunction the vertex update function - * @param messagingFunction the messaging function + * @param scatterFunction the scatter function + * @param gatherFunction the gather function * @param maxIterations maximum number of iterations to perform * @param parameters the iteration configuration parameters - * * @return the updated Graph after the scatter-gather iteration has converged or * after maximumNumberOfIterations. */ - def runScatterGatherIteration[M](vertexUpdateFunction: VertexUpdateFunction[K, VV, M], - messagingFunction: MessagingFunction[K, VV, M, EV], + def runScatterGatherIteration[M](scatterFunction: ScatterFunction[K, VV, M, EV], + gatherFunction: SpargelGatherFunction[K, VV, M], maxIterations: Int, parameters: ScatterGatherConfiguration): Graph[K, VV, EV] = { - wrapGraph(jgraph.runScatterGatherIteration(vertexUpdateFunction, messagingFunction, + wrapGraph(jgraph.runScatterGatherIteration(scatterFunction, gatherFunction, maxIterations, parameters)) } @@ -1178,11 +1174,10 @@ TypeInformation : ClassTag](jgraph: jg.Graph[K, VV, EV]) { * @param applyFunction the apply function updates the vertex values with the aggregates * @param maxIterations maximum number of iterations to perform * @tparam M the intermediate type used between gather, sum and apply - * * @return the updated Graph after the gather-sum-apply iteration has converged or * after maximumNumberOfIterations. */ - def runGatherSumApplyIteration[M](gatherFunction: GatherFunction[VV, EV, M], sumFunction: + def runGatherSumApplyIteration[M](gatherFunction: GSAGatherFunction[VV, EV, M], sumFunction: SumFunction[VV, EV, M], applyFunction: ApplyFunction[K, VV, M], maxIterations: Int): Graph[K, VV, EV] = { wrapGraph(jgraph.runGatherSumApplyIteration(gatherFunction, sumFunction, applyFunction, @@ -1199,25 +1194,23 @@ TypeInformation : ClassTag](jgraph: jg.Graph[K, VV, EV]) { * @param maxIterations maximum number of iterations to perform * @param parameters the iteration configuration parameters * @tparam M the intermediate type used between gather, sum and apply - * * @return the updated Graph after the gather-sum-apply iteration has converged or * after maximumNumberOfIterations. */ - def runGatherSumApplyIteration[M](gatherFunction: GatherFunction[VV, EV, M], sumFunction: + def runGatherSumApplyIteration[M](gatherFunction: GSAGatherFunction[VV, EV, M], sumFunction: SumFunction[VV, EV, M], applyFunction: ApplyFunction[K, VV, M], maxIterations: Int, parameters: GSAConfiguration): Graph[K, VV, EV] = { wrapGraph(jgraph.runGatherSumApplyIteration(gatherFunction, sumFunction, applyFunction, maxIterations, parameters)) } - /** + /** * Runs a vertex-centric iteration on the graph. * No configuration options are provided. * * @param computeFunction the compute function * @param combineFunction the optional message combiner function * @param maxIterations maximum number of iterations to perform - * * @return the updated Graph after the vertex-centric iteration has converged or * after maximumNumberOfIterations. */ @@ -1235,7 +1228,6 @@ TypeInformation : ClassTag](jgraph: jg.Graph[K, VV, EV]) { * @param combineFunction the optional message combiner function * @param maxIterations maximum number of iterations to perform * @param parameters the iteration configuration parameters - * * @return the updated Graph after the vertex-centric iteration has converged or * after maximumNumberOfIterations. */ http://git-wip-us.apache.org/repos/asf/flink/blob/918e5d0c/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java index fe59283..821b0a7 100755 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java @@ -45,17 +45,15 @@ import org.apache.flink.graph.asm.translate.TranslateGraphIds; import org.apache.flink.graph.asm.translate.TranslateVertexValues; import org.apache.flink.graph.gsa.ApplyFunction; import org.apache.flink.graph.gsa.GSAConfiguration; -import org.apache.flink.graph.gsa.GatherFunction; import org.apache.flink.graph.gsa.GatherSumApplyIteration; import org.apache.flink.graph.gsa.SumFunction; import org.apache.flink.graph.pregel.ComputeFunction; import org.apache.flink.graph.pregel.MessageCombiner; import org.apache.flink.graph.pregel.VertexCentricConfiguration; import org.apache.flink.graph.pregel.VertexCentricIteration; -import org.apache.flink.graph.spargel.MessagingFunction; +import org.apache.flink.graph.spargel.ScatterFunction; import org.apache.flink.graph.spargel.ScatterGatherConfiguration; import org.apache.flink.graph.spargel.ScatterGatherIteration; -import org.apache.flink.graph.spargel.VertexUpdateFunction; import org.apache.flink.graph.utils.EdgeToTuple3Map; import org.apache.flink.graph.utils.Tuple2ToVertexMap; import org.apache.flink.graph.utils.Tuple3ToEdgeMap; @@ -1652,27 +1650,27 @@ public class Graph<K, VV, EV> { * Runs a ScatterGather iteration on the graph. * No configuration options are provided. * - * @param vertexUpdateFunction the vertex update function - * @param messagingFunction the messaging function + * @param scatterFunction the scatter function + * @param gatherFunction the gather function * @param maximumNumberOfIterations maximum number of iterations to perform * * @return the updated Graph after the scatter-gather iteration has converged or * after maximumNumberOfIterations. */ public <M> Graph<K, VV, EV> runScatterGatherIteration( - VertexUpdateFunction<K, VV, M> vertexUpdateFunction, - MessagingFunction<K, VV, M, EV> messagingFunction, + ScatterFunction<K, VV, M, EV> scatterFunction, + org.apache.flink.graph.spargel.GatherFunction<K, VV, M> gatherFunction, int maximumNumberOfIterations) { - return this.runScatterGatherIteration(vertexUpdateFunction, messagingFunction, + return this.runScatterGatherIteration(scatterFunction, gatherFunction, maximumNumberOfIterations, null); } /** * Runs a ScatterGather iteration on the graph with configuration options. - * - * @param vertexUpdateFunction the vertex update function - * @param messagingFunction the messaging function + * + * @param scatterFunction the scatter function + * @param gatherFunction the gather function * @param maximumNumberOfIterations maximum number of iterations to perform * @param parameters the iteration configuration parameters * @@ -1680,12 +1678,12 @@ public class Graph<K, VV, EV> { * after maximumNumberOfIterations. */ public <M> Graph<K, VV, EV> runScatterGatherIteration( - VertexUpdateFunction<K, VV, M> vertexUpdateFunction, - MessagingFunction<K, VV, M, EV> messagingFunction, + ScatterFunction<K, VV, M, EV> scatterFunction, + org.apache.flink.graph.spargel.GatherFunction<K, VV, M> gatherFunction, int maximumNumberOfIterations, ScatterGatherConfiguration parameters) { ScatterGatherIteration<K, VV, M, EV> iteration = ScatterGatherIteration.withEdges( - edges, vertexUpdateFunction, messagingFunction, maximumNumberOfIterations); + edges, scatterFunction, gatherFunction, maximumNumberOfIterations); iteration.configure(parameters); @@ -1708,7 +1706,7 @@ public class Graph<K, VV, EV> { * after maximumNumberOfIterations. */ public <M> Graph<K, VV, EV> runGatherSumApplyIteration( - GatherFunction<VV, EV, M> gatherFunction, SumFunction<VV, EV, M> sumFunction, + org.apache.flink.graph.gsa.GatherFunction gatherFunction, SumFunction<VV, EV, M> sumFunction, ApplyFunction<K, VV, M> applyFunction, int maximumNumberOfIterations) { return this.runGatherSumApplyIteration(gatherFunction, sumFunction, applyFunction, @@ -1729,7 +1727,7 @@ public class Graph<K, VV, EV> { * after maximumNumberOfIterations. */ public <M> Graph<K, VV, EV> runGatherSumApplyIteration( - GatherFunction<VV, EV, M> gatherFunction, SumFunction<VV, EV, M> sumFunction, + org.apache.flink.graph.gsa.GatherFunction gatherFunction, SumFunction<VV, EV, M> sumFunction, ApplyFunction<K, VV, M> applyFunction, int maximumNumberOfIterations, GSAConfiguration parameters) { http://git-wip-us.apache.org/repos/asf/flink/blob/918e5d0c/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/IterationConfiguration.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/IterationConfiguration.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/IterationConfiguration.java index 7578420..0b98a27 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/IterationConfiguration.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/IterationConfiguration.java @@ -22,6 +22,7 @@ import java.util.HashMap; import java.util.Map; import org.apache.flink.api.common.aggregators.Aggregator; +import org.apache.flink.graph.spargel.GatherFunction; import org.apache.flink.util.Preconditions; /** @@ -133,8 +134,8 @@ public abstract class IterationConfiguration { /** * Registers a new aggregator. Aggregators registered here are available during the execution of the vertex updates - * via {@link org.apache.flink.graph.spargel.VertexUpdateFunction#getIterationAggregator(String)} and - * {@link org.apache.flink.graph.spargel.VertexUpdateFunction#getPreviousIterationAggregate(String)}. + * via {@link GatherFunction#getIterationAggregator(String)} and + * {@link GatherFunction#getPreviousIterationAggregate(String)}. * * @param name The name of the aggregator, used to retrieve it and its aggregates during execution. * @param aggregator The aggregator. http://git-wip-us.apache.org/repos/asf/flink/blob/918e5d0c/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/CommunityDetection.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/CommunityDetection.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/CommunityDetection.java index 4d68661..f554680 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/CommunityDetection.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/CommunityDetection.java @@ -26,9 +26,9 @@ import org.apache.flink.graph.Edge; import org.apache.flink.graph.Graph; import org.apache.flink.graph.GraphAlgorithm; import org.apache.flink.graph.Vertex; +import org.apache.flink.graph.spargel.GatherFunction; import org.apache.flink.graph.spargel.MessageIterator; -import org.apache.flink.graph.spargel.MessagingFunction; -import org.apache.flink.graph.spargel.VertexUpdateFunction; +import org.apache.flink.graph.spargel.ScatterFunction; import java.util.Map; import java.util.TreeMap; @@ -73,19 +73,33 @@ public class CommunityDetection<K> implements GraphAlgorithm<K, Long, Double, Gr public Graph<K, Long, Double> run(Graph<K, Long, Double> graph) { DataSet<Vertex<K, Tuple2<Long, Double>>> initializedVertices = graph.getVertices() - .map(new AddScoreToVertexValuesMapper<K>()); + .map(new AddScoreToVertexValuesMapper<K>()); Graph<K, Tuple2<Long, Double>, Double> graphWithScoredVertices = - Graph.fromDataSet(initializedVertices, graph.getEdges(), graph.getContext()).getUndirected(); + Graph.fromDataSet(initializedVertices, graph.getEdges(), graph.getContext()).getUndirected(); - return graphWithScoredVertices.runScatterGatherIteration(new VertexLabelUpdater<K>(delta), - new LabelMessenger<K>(), maxIterations) + return graphWithScoredVertices.runScatterGatherIteration(new LabelMessenger<K>(), + new VertexLabelUpdater<K>(delta), maxIterations) .mapVertices(new RemoveScoreFromVertexValuesMapper<K>()); } @SuppressWarnings("serial") - public static final class VertexLabelUpdater<K> extends VertexUpdateFunction< - K, Tuple2<Long, Double>, Tuple2<Long, Double>> { + public static final class LabelMessenger<K> extends ScatterFunction<K, Tuple2<Long, Double>, + Tuple2<Long, Double>, Double> { + + @Override + public void sendMessages(Vertex<K, Tuple2<Long, Double>> vertex) throws Exception { + + for(Edge<K, Double> edge : getEdges()) { + sendMessageTo(edge.getTarget(), new Tuple2<Long, Double>(vertex.getValue().f0, + vertex.getValue().f1 * edge.getValue())); + } + } + } + + @SuppressWarnings("serial") + public static final class VertexLabelUpdater<K> extends GatherFunction< + K, Tuple2<Long, Double>, Tuple2<Long, Double>> { private Double delta; @@ -154,27 +168,13 @@ public class CommunityDetection<K> implements GraphAlgorithm<K, Long, Double, Gr } @SuppressWarnings("serial") - public static final class LabelMessenger<K> extends MessagingFunction<K, Tuple2<Long, Double>, - Tuple2<Long, Double>, Double> { - - @Override - public void sendMessages(Vertex<K, Tuple2<Long, Double>> vertex) throws Exception { - - for(Edge<K, Double> edge : getEdges()) { - sendMessageTo(edge.getTarget(), new Tuple2<Long, Double>(vertex.getValue().f0, - vertex.getValue().f1 * edge.getValue())); - } - } - } - - @SuppressWarnings("serial") @ForwardedFields("f0") public static final class AddScoreToVertexValuesMapper<K> implements MapFunction< Vertex<K, Long>, Vertex<K, Tuple2<Long, Double>>> { public Vertex<K, Tuple2<Long, Double>> map(Vertex<K, Long> vertex) { return new Vertex<K, Tuple2<Long, Double>>( - vertex.getId(), new Tuple2<Long, Double>(vertex.getValue(), 1.0)); + vertex.getId(), new Tuple2<Long, Double>(vertex.getValue(), 1.0)); } } http://git-wip-us.apache.org/repos/asf/flink/blob/918e5d0c/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/ConnectedComponents.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/ConnectedComponents.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/ConnectedComponents.java index efc32c1..3cd8f05 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/ConnectedComponents.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/ConnectedComponents.java @@ -25,9 +25,9 @@ import org.apache.flink.api.java.typeutils.TupleTypeInfo; import org.apache.flink.graph.Graph; import org.apache.flink.graph.GraphAlgorithm; import org.apache.flink.graph.Vertex; +import org.apache.flink.graph.spargel.GatherFunction; import org.apache.flink.graph.spargel.MessageIterator; -import org.apache.flink.graph.spargel.MessagingFunction; -import org.apache.flink.graph.spargel.VertexUpdateFunction; +import org.apache.flink.graph.spargel.ScatterFunction; import org.apache.flink.graph.utils.NullValueEdgeMapper; import org.apache.flink.types.NullValue; @@ -76,39 +76,16 @@ public class ConnectedComponents<K, VV extends Comparable<VV>, EV> .getUndirected(); return undirectedGraph.runScatterGatherIteration( - new CCUpdater<K, VV>(), new CCMessenger<K, VV>(valueTypeInfo), + new CCUpdater<K, VV>(), maxIterations).getVertices(); } /** - * Updates the value of a vertex by picking the minimum neighbor value out of all the incoming messages. - */ - public static final class CCUpdater<K, VV extends Comparable<VV>> - extends VertexUpdateFunction<K, VV, VV> { - - @Override - public void updateVertex(Vertex<K, VV> vertex, MessageIterator<VV> messages) throws Exception { - VV current = vertex.getValue(); - VV min = current; - - for (VV msg : messages) { - if (msg.compareTo(min) < 0) { - min = msg; - } - } - - if (!min.equals(current)) { - setNewVertexValue(min); - } - } - } - - /** * Sends the current vertex value to all adjacent vertices. */ public static final class CCMessenger<K, VV extends Comparable<VV>> - extends MessagingFunction<K, VV, VV, NullValue> + extends ScatterFunction<K, VV, VV, NullValue> implements ResultTypeQueryable<VV> { private final TypeInformation<VV> typeInformation; @@ -128,4 +105,27 @@ public class ConnectedComponents<K, VV extends Comparable<VV>, EV> return typeInformation; } } + + /** + * Updates the value of a vertex by picking the minimum neighbor value out of all the incoming messages. + */ + public static final class CCUpdater<K, VV extends Comparable<VV>> + extends GatherFunction<K, VV, VV> { + + @Override + public void updateVertex(Vertex<K, VV> vertex, MessageIterator<VV> messages) throws Exception { + VV current = vertex.getValue(); + VV min = current; + + for (VV msg : messages) { + if (msg.compareTo(min) < 0) { + min = msg; + } + } + + if (!min.equals(current)) { + setNewVertexValue(min); + } + } + } } http://git-wip-us.apache.org/repos/asf/flink/blob/918e5d0c/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/GSAConnectedComponents.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/GSAConnectedComponents.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/GSAConnectedComponents.java index a12cb20..327de73 100755 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/GSAConnectedComponents.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/GSAConnectedComponents.java @@ -27,8 +27,8 @@ import org.apache.flink.graph.GraphAlgorithm; import org.apache.flink.graph.Vertex; import org.apache.flink.graph.gsa.ApplyFunction; import org.apache.flink.graph.gsa.GatherFunction; -import org.apache.flink.graph.gsa.SumFunction; import org.apache.flink.graph.gsa.Neighbor; +import org.apache.flink.graph.gsa.SumFunction; import org.apache.flink.graph.utils.NullValueEdgeMapper; import org.apache.flink.types.NullValue; http://git-wip-us.apache.org/repos/asf/flink/blob/918e5d0c/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/GSASingleSourceShortestPaths.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/GSASingleSourceShortestPaths.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/GSASingleSourceShortestPaths.java index 5899fa0..f39d858 100755 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/GSASingleSourceShortestPaths.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/GSASingleSourceShortestPaths.java @@ -25,8 +25,8 @@ import org.apache.flink.graph.GraphAlgorithm; import org.apache.flink.graph.Vertex; import org.apache.flink.graph.gsa.ApplyFunction; import org.apache.flink.graph.gsa.GatherFunction; -import org.apache.flink.graph.gsa.SumFunction; import org.apache.flink.graph.gsa.Neighbor; +import org.apache.flink.graph.gsa.SumFunction; /** * This is an implementation of the Single Source Shortest Paths algorithm, using a gather-sum-apply iteration http://git-wip-us.apache.org/repos/asf/flink/blob/918e5d0c/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/LabelPropagation.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/LabelPropagation.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/LabelPropagation.java index fef6808..2d13dfd 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/LabelPropagation.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/LabelPropagation.java @@ -25,9 +25,9 @@ import org.apache.flink.api.java.typeutils.TupleTypeInfo; import org.apache.flink.graph.Graph; import org.apache.flink.graph.GraphAlgorithm; import org.apache.flink.graph.Vertex; +import org.apache.flink.graph.spargel.GatherFunction; import org.apache.flink.graph.spargel.MessageIterator; -import org.apache.flink.graph.spargel.MessagingFunction; -import org.apache.flink.graph.spargel.VertexUpdateFunction; +import org.apache.flink.graph.spargel.ScatterFunction; import org.apache.flink.graph.utils.NullValueEdgeMapper; import org.apache.flink.types.NullValue; @@ -78,15 +78,38 @@ public class LabelPropagation<K, VV extends Comparable<VV>, EV> return input .mapEdges(new NullValueEdgeMapper<K, EV>()) .runScatterGatherIteration( - new UpdateVertexLabel<K, VV>(), new SendNewLabelToNeighbors<K, VV>(valueType), maxIterations) + new SendNewLabelToNeighbors<K, VV>(valueType), new UpdateVertexLabel<K, VV>(), maxIterations) .getVertices(); } /** + * Sends the vertex label to all out-neighbors + */ + public static final class SendNewLabelToNeighbors<K, VV extends Comparable<VV>> + extends ScatterFunction<K, VV, VV, NullValue> + implements ResultTypeQueryable<VV> { + + private final TypeInformation<VV> typeInformation; + + public SendNewLabelToNeighbors(TypeInformation<VV> typeInformation) { + this.typeInformation = typeInformation; + } + + public void sendMessages(Vertex<K, VV> vertex) { + sendMessageToAllNeighbors(vertex.getValue()); + } + + @Override + public TypeInformation<VV> getProducedType() { + return typeInformation; + } + } + + /** * Function that updates the value of a vertex by adopting the most frequent * label among its in-neighbors */ - public static final class UpdateVertexLabel<K, VV extends Comparable<VV>> extends VertexUpdateFunction<K, VV, VV> { + public static final class UpdateVertexLabel<K, VV extends Comparable<VV>> extends GatherFunction<K, VV, VV> { public void updateVertex(Vertex<K, VV> vertex, MessageIterator<VV> inMessages) { Map<VV, Long> labelsWithFrequencies = new HashMap<VV, Long>(); @@ -119,27 +142,4 @@ public class LabelPropagation<K, VV extends Comparable<VV>, EV> setNewVertexValue(mostFrequentLabel); } } - - /** - * Sends the vertex label to all out-neighbors - */ - public static final class SendNewLabelToNeighbors<K, VV extends Comparable<VV>> - extends MessagingFunction<K, VV, VV, NullValue> - implements ResultTypeQueryable<VV> { - - private final TypeInformation<VV> typeInformation; - - public SendNewLabelToNeighbors(TypeInformation<VV> typeInformation) { - this.typeInformation = typeInformation; - } - - public void sendMessages(Vertex<K, VV> vertex) { - sendMessageToAllNeighbors(vertex.getValue()); - } - - @Override - public TypeInformation<VV> getProducedType() { - return typeInformation; - } - } } http://git-wip-us.apache.org/repos/asf/flink/blob/918e5d0c/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/PageRank.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/PageRank.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/PageRank.java index 2f1b03b..bf9b4e9 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/PageRank.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/PageRank.java @@ -25,10 +25,10 @@ import org.apache.flink.graph.EdgeJoinFunction; import org.apache.flink.graph.Graph; import org.apache.flink.graph.GraphAlgorithm; import org.apache.flink.graph.Vertex; +import org.apache.flink.graph.spargel.GatherFunction; import org.apache.flink.graph.spargel.MessageIterator; -import org.apache.flink.graph.spargel.MessagingFunction; +import org.apache.flink.graph.spargel.ScatterFunction; import org.apache.flink.graph.spargel.ScatterGatherConfiguration; -import org.apache.flink.graph.spargel.VertexUpdateFunction; import org.apache.flink.types.LongValue; /** @@ -65,18 +65,37 @@ public class PageRank<K> implements GraphAlgorithm<K, Double, Double, DataSet<Ve ScatterGatherConfiguration parameters = new ScatterGatherConfiguration(); parameters.setOptNumVertices(true); - return networkWithWeights.runScatterGatherIteration(new VertexRankUpdater<K>(beta), - new RankMessenger<K>(), maxIterations, parameters) + return networkWithWeights.runScatterGatherIteration(new RankMessenger<K>(), + new VertexRankUpdater<K>(beta), maxIterations, parameters) .getVertices(); } /** + * Distributes the rank of a vertex among all target vertices according to + * the transition probability, which is associated with an edge as the edge + * value. + */ + @SuppressWarnings("serial") + public static final class RankMessenger<K> extends ScatterFunction<K, Double, Double, Double> { + @Override + public void sendMessages(Vertex<K, Double> vertex) { + if (getSuperstepNumber() == 1) { + // initialize vertex ranks + vertex.setValue(1.0 / this.getNumberOfVertices()); + } + + for (Edge<K, Double> edge : getEdges()) { + sendMessageTo(edge.getTarget(), vertex.getValue() * edge.getValue()); + } + } + } + + /** * Function that updates the rank of a vertex by summing up the partial * ranks from all incoming messages and then applying the dampening formula. */ @SuppressWarnings("serial") - public static final class VertexRankUpdater<K> extends VertexUpdateFunction<K, Double, Double> { - + public static final class VertexRankUpdater<K> extends GatherFunction<K, Double, Double> { private final double beta; public VertexRankUpdater(double beta) { @@ -96,30 +115,8 @@ public class PageRank<K> implements GraphAlgorithm<K, Double, Double, DataSet<Ve } } - /** - * Distributes the rank of a vertex among all target vertices according to - * the transition probability, which is associated with an edge as the edge - * value. - */ - @SuppressWarnings("serial") - public static final class RankMessenger<K> extends MessagingFunction<K, Double, Double, Double> { - - @Override - public void sendMessages(Vertex<K, Double> vertex) { - if (getSuperstepNumber() == 1) { - // initialize vertex ranks - vertex.setValue(1.0 / this.getNumberOfVertices()); - } - - for (Edge<K, Double> edge : getEdges()) { - sendMessageTo(edge.getTarget(), vertex.getValue() * edge.getValue()); - } - } - } - @SuppressWarnings("serial") private static final class InitWeights implements EdgeJoinFunction<Double, LongValue> { - public Double edgeJoin(Double edgeValue, LongValue inputValue) { return edgeValue / (double) inputValue.getValue(); }