http://git-wip-us.apache.org/repos/asf/flink/blob/918e5d0c/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/SingleSourceShortestPaths.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/SingleSourceShortestPaths.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/SingleSourceShortestPaths.java index 29183e9..4ff4e79 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/SingleSourceShortestPaths.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/SingleSourceShortestPaths.java @@ -24,9 +24,9 @@ import org.apache.flink.graph.Edge; import org.apache.flink.graph.Graph; import org.apache.flink.graph.GraphAlgorithm; import org.apache.flink.graph.Vertex; +import org.apache.flink.graph.spargel.GatherFunction; import org.apache.flink.graph.spargel.MessageIterator; -import org.apache.flink.graph.spargel.MessagingFunction; -import org.apache.flink.graph.spargel.VertexUpdateFunction; +import org.apache.flink.graph.spargel.ScatterFunction; /** * This is an implementation of the Single-Source-Shortest Paths algorithm, using a scatter-gather iteration. @@ -52,7 +52,7 @@ public class SingleSourceShortestPaths<K> implements GraphAlgorithm<K, Double, D public DataSet<Vertex<K, Double>> run(Graph<K, Double, Double> input) { return input.mapVertices(new InitVerticesMapper<K>(srcVertexId)) - .runScatterGatherIteration(new VertexDistanceUpdater<K>(), new MinDistanceMessenger<K>(), + .runScatterGatherIteration(new MinDistanceMessenger<K>(), new VertexDistanceUpdater<K>(), maxIterations).getVertices(); } @@ -74,12 +74,30 @@ public class SingleSourceShortestPaths<K> implements GraphAlgorithm<K, Double, D } /** + * Distributes the minimum distance associated with a given vertex among all + * the target vertices summed up with the edge's value. + * + * @param <K> + */ + public static final class MinDistanceMessenger<K> extends ScatterFunction<K, Double, Double, Double> { + + @Override + public void sendMessages(Vertex<K, Double> vertex) { + if (vertex.getValue() < Double.POSITIVE_INFINITY) { + for (Edge<K, Double> edge : getEdges()) { + sendMessageTo(edge.getTarget(), vertex.getValue() + edge.getValue()); + } + } + } + } + + /** * Function that updates the value of a vertex by picking the minimum * distance from all incoming messages. * * @param <K> */ - public static final class VertexDistanceUpdater<K> extends VertexUpdateFunction<K, Double, Double> { + public static final class VertexDistanceUpdater<K> extends GatherFunction<K, Double, Double> { @Override public void updateVertex(Vertex<K, Double> vertex, @@ -98,22 +116,4 @@ public class SingleSourceShortestPaths<K> implements GraphAlgorithm<K, Double, D } } } - - /** - * Distributes the minimum distance associated with a given vertex among all - * the target vertices summed up with the edge's value. - * - * @param <K> - */ - public static final class MinDistanceMessenger<K> extends MessagingFunction<K, Double, Double, Double> { - - @Override - public void sendMessages(Vertex<K, Double> vertex) { - if (vertex.getValue() < Double.POSITIVE_INFINITY) { - for (Edge<K, Double> edge : getEdges()) { - sendMessageTo(edge.getTarget(), vertex.getValue() + edge.getValue()); - } - } - } - } }
http://git-wip-us.apache.org/repos/asf/flink/blob/918e5d0c/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/TriangleEnumerator.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/TriangleEnumerator.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/TriangleEnumerator.java index 8272d8f..681d060 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/TriangleEnumerator.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/TriangleEnumerator.java @@ -18,11 +18,11 @@ package org.apache.flink.graph.library; -import org.apache.flink.api.common.functions.MapFunction; -import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.api.common.functions.FlatMapFunction; -import org.apache.flink.api.common.functions.JoinFunction; import org.apache.flink.api.common.functions.GroupReduceFunction; +import org.apache.flink.api.common.functions.JoinFunction; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.api.common.operators.Order; import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint; import org.apache.flink.api.java.DataSet; http://git-wip-us.apache.org/repos/asf/flink/blob/918e5d0c/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/GatherFunction.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/GatherFunction.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/GatherFunction.java new file mode 100644 index 0000000..d56c0da --- /dev/null +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/GatherFunction.java @@ -0,0 +1,251 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.spargel; + +import org.apache.flink.api.common.aggregators.Aggregator; +import org.apache.flink.api.common.functions.IterationRuntimeContext; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.graph.Vertex; +import org.apache.flink.types.Value; +import org.apache.flink.util.Collector; + +import java.io.Serializable; +import java.util.Collection; + +/** + * This class must be extended by functions that compute the state of the vertex depending on the old state and the + * incoming messages. The central method is {@link #updateVertex(Vertex, MessageIterator)}, which is + * invoked once per vertex per superstep. + * + * {@code <K>} The vertex key type. + * {@code <VV>} The vertex value type. + * {@code <Message>} The message type. + */ +public abstract class GatherFunction<K, VV, Message> implements Serializable { + + private static final long serialVersionUID = 1L; + + // -------------------------------------------------------------------------------------------- + // Attributes that allow vertices to access their in/out degrees and the total number of vertices + // inside an iteration. + // -------------------------------------------------------------------------------------------- + + private long numberOfVertices = -1L; + + /** + * Retrieves the number of vertices in the graph. + * @return the number of vertices if the {@link org.apache.flink.graph.IterationConfiguration#setOptNumVertices(boolean)} + * option has been set; -1 otherwise. + */ + public long getNumberOfVertices() { + return numberOfVertices; + } + + void setNumberOfVertices(long numberOfVertices) { + this.numberOfVertices = numberOfVertices; + } + + //--------------------------------------------------------------------------------------------- + + private boolean optDegrees; + + boolean isOptDegrees() { + return optDegrees; + } + + void setOptDegrees(boolean optDegrees) { + this.optDegrees = optDegrees; + } + + // -------------------------------------------------------------------------------------------- + // Public API Methods + // -------------------------------------------------------------------------------------------- + + /** + * This method is invoked once per vertex per superstep. It receives the current state of the vertex, as well as + * the incoming messages. It may set a new vertex state via {@link #setNewVertexValue(Object)}. If the vertex + * state is changed, it will trigger the sending of messages via the {@link ScatterFunction}. + * + * @param vertex The vertex. + * @param inMessages The incoming messages to this vertex. + * + * @throws Exception The computation may throw exceptions, which causes the superstep to fail. + */ + public abstract void updateVertex(Vertex<K, VV> vertex, MessageIterator<Message> inMessages) throws Exception; + + /** + * This method is executed once per superstep before the gather function is invoked for each vertex. + * + * @throws Exception Exceptions in the pre-superstep phase cause the superstep to fail. + */ + public void preSuperstep() throws Exception {} + + /** + * This method is executed once per superstep after the gather function has been invoked for each vertex. + * + * @throws Exception Exceptions in the post-superstep phase cause the superstep to fail. + */ + public void postSuperstep() throws Exception {} + + /** + * Sets the new value of this vertex. Setting a new value triggers the sending of outgoing messages from this vertex. + * + * This should be called at most once per updateVertex. + * + * @param newValue The new vertex value. + */ + public void setNewVertexValue(VV newValue) { + if(setNewVertexValueCalled) { + throw new IllegalStateException("setNewVertexValue should only be called at most once per updateVertex"); + } + setNewVertexValueCalled = true; + if(isOptDegrees()) { + outValWithDegrees.f1.f0 = newValue; + outWithDegrees.collect(outValWithDegrees); + } else { + outVal.setValue(newValue); + out.collect(outVal); + } + } + + /** + * Gets the number of the superstep, starting at <tt>1</tt>. + * + * @return The number of the current superstep. + */ + public int getSuperstepNumber() { + return this.runtimeContext.getSuperstepNumber(); + } + + /** + * Gets the iteration aggregator registered under the given name. The iteration aggregator combines + * all aggregates globally once per superstep and makes them available in the next superstep. + * + * @param name The name of the aggregator. + * @return The aggregator registered under this name, or null, if no aggregator was registered. + */ + public <T extends Aggregator<?>> T getIterationAggregator(String name) { + return this.runtimeContext.<T>getIterationAggregator(name); + } + + /** + * Get the aggregated value that an aggregator computed in the previous iteration. + * + * @param name The name of the aggregator. + * @return The aggregated value of the previous iteration. + */ + public <T extends Value> T getPreviousIterationAggregate(String name) { + return this.runtimeContext.<T>getPreviousIterationAggregate(name); + } + + /** + * Gets the broadcast data set registered under the given name. Broadcast data sets + * are available on all parallel instances of a function. They can be registered via + * {@link org.apache.flink.graph.spargel.ScatterGatherConfiguration#addBroadcastSetForGatherFunction(String, org.apache.flink.api.java.DataSet)}. + * + * @param name The name under which the broadcast set is registered. + * @return The broadcast data set. + */ + public <T> Collection<T> getBroadcastSet(String name) { + return this.runtimeContext.<T>getBroadcastVariable(name); + } + + // -------------------------------------------------------------------------------------------- + // internal methods + // -------------------------------------------------------------------------------------------- + + private IterationRuntimeContext runtimeContext; + + private Collector<Vertex<K, VV>> out; + + private Collector<Vertex<K, Tuple3<VV, Long, Long>>> outWithDegrees; + + private Vertex<K, VV> outVal; + + private Vertex<K, Tuple3<VV, Long, Long>> outValWithDegrees; + + private long inDegree = -1; + + private long outDegree = -1; + + private boolean setNewVertexValueCalled; + + void init(IterationRuntimeContext context) { + this.runtimeContext = context; + } + + void setOutput(Vertex<K, VV> outVal, Collector<Vertex<K, VV>> out) { + this.outVal = outVal; + this.out = out; + setNewVertexValueCalled = false; + } + + @SuppressWarnings({ "unchecked", "rawtypes" }) + <ValueWithDegree> void setOutputWithDegrees(Vertex<K, ValueWithDegree> outVal, + Collector out) { + this.outValWithDegrees = (Vertex<K, Tuple3<VV, Long, Long>>) outVal; + this.outWithDegrees = out; + setNewVertexValueCalled = false; + } + + /** + * Retrieves the vertex in-degree (number of in-coming edges). + * @return The in-degree of this vertex + */ + public long getInDegree() { + return inDegree; + } + + void setInDegree(long inDegree) { + this.inDegree = inDegree; + } + + /** + * Retrieve the vertex out-degree (number of out-going edges). + * @return The out-degree of this vertex + */ + public long getOutDegree() { + return outDegree; + } + + void setOutDegree(long outDegree) { + this.outDegree = outDegree; + } + + /** + * In order to hide the Tuple3(actualValue, inDegree, OutDegree) vertex value from the user, + * another function will be called from {@link org.apache.flink.graph.spargel.ScatterGatherIteration}. + * + * This function will retrieve the vertex from the vertexState and will set its degrees, afterwards calling + * the regular updateVertex function. + * + * @param vertexState + * @param inMessages + * @throws Exception + */ + @SuppressWarnings("unchecked") + <VertexWithDegree> void updateVertexFromScatterGatherIteration(Vertex<K, VertexWithDegree> vertexState, + MessageIterator<Message> inMessages) throws Exception { + + Vertex<K, VV> vertex = new Vertex<K, VV>(vertexState.f0, + ((Tuple3<VV, Long, Long>)vertexState.getValue()).f0); + + updateVertex(vertex, inMessages); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/918e5d0c/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/MessagingFunction.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/MessagingFunction.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/MessagingFunction.java deleted file mode 100644 index e12d779..0000000 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/MessagingFunction.java +++ /dev/null @@ -1,338 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.graph.spargel; - -import java.io.Serializable; -import java.util.Collection; -import java.util.Iterator; - -import org.apache.flink.api.common.aggregators.Aggregator; -import org.apache.flink.api.common.functions.IterationRuntimeContext; -import org.apache.flink.api.java.tuple.Tuple; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.graph.Edge; -import org.apache.flink.graph.EdgeDirection; -import org.apache.flink.graph.Vertex; -import org.apache.flink.types.Value; -import org.apache.flink.util.Collector; - -/** - * The base class for functions that produce messages between vertices as a part of a {@link ScatterGatherIteration}. - * - * @param <K> The type of the vertex key (the vertex identifier). - * @param <VV> The type of the vertex value (the state of the vertex). - * @param <Message> The type of the message sent between vertices along the edges. - * @param <EV> The type of the values that are associated with the edges. - */ -public abstract class MessagingFunction<K, VV, Message, EV> implements Serializable { - - private static final long serialVersionUID = 1L; - - // -------------------------------------------------------------------------------------------- - // Attributes that allow vertices to access their in/out degrees and the total number of vertices - // inside an iteration. - // -------------------------------------------------------------------------------------------- - - private long numberOfVertices = -1L; - - /** - * Retrieves the number of vertices in the graph. - * @return the number of vertices if the {@link org.apache.flink.graph.IterationConfiguration#setOptNumVertices(boolean)} - * option has been set; -1 otherwise. - */ - public long getNumberOfVertices() { - return numberOfVertices; - } - - void setNumberOfVertices(long numberOfVertices) { - this.numberOfVertices = numberOfVertices; - } - - // -------------------------------------------------------------------------------------------- - // Attribute that allows the user to choose the neighborhood type(in/out/all) on which to run - // the scatter gather iteration. - // -------------------------------------------------------------------------------------------- - - private EdgeDirection direction; - - /** - * Retrieves the edge direction in which messages are propagated in the scatter-gather iteration. - * @return the messaging {@link EdgeDirection} - */ - public EdgeDirection getDirection() { - return direction; - } - - void setDirection(EdgeDirection direction) { - this.direction = direction; - } - - // -------------------------------------------------------------------------------------------- - // Public API Methods - // -------------------------------------------------------------------------------------------- - - /** - * This method is invoked once per superstep for each vertex that was changed in that superstep. - * It needs to produce the messages that will be received by vertices in the next superstep. - * - * @param vertex The vertex that was changed. - * - * @throws Exception The computation may throw exceptions, which causes the superstep to fail. - */ - public abstract void sendMessages(Vertex<K, VV> vertex) throws Exception; - - /** - * This method is executed once per superstep before the vertex update function is invoked for each vertex. - * - * @throws Exception Exceptions in the pre-superstep phase cause the superstep to fail. - */ - public void preSuperstep() throws Exception {} - - /** - * This method is executed once per superstep after the vertex update function has been invoked for each vertex. - * - * @throws Exception Exceptions in the post-superstep phase cause the superstep to fail. - */ - public void postSuperstep() throws Exception {} - - - /** - * Gets an {@link java.lang.Iterable} with all edges. This method is mutually exclusive with - * {@link #sendMessageToAllNeighbors(Object)} and may be called only once. - * <p> - * If the {@link EdgeDirection} is OUT (default), then this iterator contains outgoing edges. - * If the {@link EdgeDirection} is IN, then this iterator contains incoming edges. - * If the {@link EdgeDirection} is ALL, then this iterator contains both outgoing and incoming edges. - * - * @return An iterator with all edges. - */ - @SuppressWarnings("unchecked") - public Iterable<Edge<K, EV>> getEdges() { - if (edgesUsed) { - throw new IllegalStateException("Can use either 'getEdges()' or 'sendMessageToAllNeighbors()' exactly once."); - } - edgesUsed = true; - this.edgeIterator.set((Iterator<Edge<K, EV>>) edges); - return this.edgeIterator; - } - - /** - * Sends the given message to all vertices that are targets of an edge of the changed vertex. - * This method is mutually exclusive to the method {@link #getEdges()} and may be called only once. - * <p> - * If the {@link EdgeDirection} is OUT (default), the message will be sent to out-neighbors. - * If the {@link EdgeDirection} is IN, the message will be sent to in-neighbors. - * If the {@link EdgeDirection} is ALL, the message will be sent to all neighbors. - * - * @param m The message to send. - */ - public void sendMessageToAllNeighbors(Message m) { - if (edgesUsed) { - throw new IllegalStateException("Can use either 'getEdges()' or 'sendMessageToAllNeighbors()'" - + "exactly once."); - } - - edgesUsed = true; - outValue.f1 = m; - - while (edges.hasNext()) { - Tuple next = (Tuple) edges.next(); - - /* - * When EdgeDirection is OUT, the edges iterator only has the out-edges - * of the vertex, i.e. the ones where this vertex is src. - * next.getField(1) gives the neighbor of the vertex running this MessagingFunction. - */ - if (getDirection().equals(EdgeDirection.OUT)) { - outValue.f0 = next.getField(1); - } - /* - * When EdgeDirection is IN, the edges iterator only has the in-edges - * of the vertex, i.e. the ones where this vertex is trg. - * next.getField(10) gives the neighbor of the vertex running this MessagingFunction. - */ - else if (getDirection().equals(EdgeDirection.IN)) { - outValue.f0 = next.getField(0); - } - // When EdgeDirection is ALL, the edges iterator contains both in- and out- edges - if (getDirection().equals(EdgeDirection.ALL)) { - if (next.getField(0).equals(vertexId)) { - // send msg to the trg - outValue.f0 = next.getField(1); - } - else { - // send msg to the src - outValue.f0 = next.getField(0); - } - } - out.collect(outValue); - } - } - - /** - * Sends the given message to the vertex identified by the given key. If the target vertex does not exist, - * the next superstep will cause an exception due to a non-deliverable message. - * - * @param target The key (id) of the target vertex to message. - * @param m The message. - */ - public void sendMessageTo(K target, Message m) { - outValue.f0 = target; - outValue.f1 = m; - out.collect(outValue); - } - - // -------------------------------------------------------------------------------------------- - - /** - * Gets the number of the superstep, starting at <tt>1</tt>. - * - * @return The number of the current superstep. - */ - public int getSuperstepNumber() { - return this.runtimeContext.getSuperstepNumber(); - } - - /** - * Gets the iteration aggregator registered under the given name. The iteration aggregator combines - * all aggregates globally once per superstep and makes them available in the next superstep. - * - * @param name The name of the aggregator. - * @return The aggregator registered under this name, or null, if no aggregator was registered. - */ - public <T extends Aggregator<?>> T getIterationAggregator(String name) { - return this.runtimeContext.<T>getIterationAggregator(name); - } - - /** - * Get the aggregated value that an aggregator computed in the previous iteration. - * - * @param name The name of the aggregator. - * @return The aggregated value of the previous iteration. - */ - public <T extends Value> T getPreviousIterationAggregate(String name) { - return this.runtimeContext.<T>getPreviousIterationAggregate(name); - } - - /** - * Gets the broadcast data set registered under the given name. Broadcast data sets - * are available on all parallel instances of a function. They can be registered via - * {@link org.apache.flink.graph.spargel.ScatterGatherConfiguration#addBroadcastSetForMessagingFunction(String, org.apache.flink.api.java.DataSet)}. - * - * @param name The name under which the broadcast set is registered. - * @return The broadcast data set. - */ - public <T> Collection<T> getBroadcastSet(String name) { - return this.runtimeContext.<T>getBroadcastVariable(name); - } - - // -------------------------------------------------------------------------------------------- - // internal methods and state - // -------------------------------------------------------------------------------------------- - - private Tuple2<K, Message> outValue; - - private IterationRuntimeContext runtimeContext; - - private Iterator<?> edges; - - private Collector<Tuple2<K, Message>> out; - - private K vertexId; - - private EdgesIterator<K, EV> edgeIterator; - - private boolean edgesUsed; - - private long inDegree = -1; - - private long outDegree = -1; - - void init(IterationRuntimeContext context) { - this.runtimeContext = context; - this.outValue = new Tuple2<K, Message>(); - this.edgeIterator = new EdgesIterator<K, EV>(); - } - - void set(Iterator<?> edges, Collector<Tuple2<K, Message>> out, K id) { - this.edges = edges; - this.out = out; - this.vertexId = id; - this.edgesUsed = false; - } - - private static final class EdgesIterator<K, EV> - implements Iterator<Edge<K, EV>>, Iterable<Edge<K, EV>> - { - private Iterator<Edge<K, EV>> input; - - private Edge<K, EV> edge = new Edge<K, EV>(); - - void set(Iterator<Edge<K, EV>> input) { - this.input = input; - } - - @Override - public boolean hasNext() { - return input.hasNext(); - } - - @Override - public Edge<K, EV> next() { - Edge<K, EV> next = input.next(); - edge.setSource(next.f0); - edge.setTarget(next.f1); - edge.setValue(next.f2); - return edge; - } - - @Override - public void remove() { - throw new UnsupportedOperationException(); - } - @Override - public Iterator<Edge<K, EV>> iterator() { - return this; - } - } - - /** - * Retrieves the vertex in-degree (number of in-coming edges). - * @return The in-degree of this vertex - */ - public long getInDegree() { - return inDegree; - } - - void setInDegree(long inDegree) { - this.inDegree = inDegree; - } - - /** - * Retrieve the vertex out-degree (number of out-going edges). - * @return The out-degree of this vertex - */ - public long getOutDegree() { - return outDegree; - } - - void setOutDegree(long outDegree) { - this.outDegree = outDegree; - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/918e5d0c/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/ScatterFunction.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/ScatterFunction.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/ScatterFunction.java new file mode 100644 index 0000000..336e73d --- /dev/null +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/ScatterFunction.java @@ -0,0 +1,338 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.spargel; + +import org.apache.flink.api.common.aggregators.Aggregator; +import org.apache.flink.api.common.functions.IterationRuntimeContext; +import org.apache.flink.api.java.tuple.Tuple; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.graph.Edge; +import org.apache.flink.graph.EdgeDirection; +import org.apache.flink.graph.Vertex; +import org.apache.flink.types.Value; +import org.apache.flink.util.Collector; + +import java.io.Serializable; +import java.util.Collection; +import java.util.Iterator; + +/** + * The base class for functions that produce messages between vertices as a part of a {@link ScatterGatherIteration}. + * + * @param <K> The type of the vertex key (the vertex identifier). + * @param <VV> The type of the vertex value (the state of the vertex). + * @param <Message> The type of the message sent between vertices along the edges. + * @param <EV> The type of the values that are associated with the edges. + */ +public abstract class ScatterFunction<K, VV, Message, EV> implements Serializable { + + private static final long serialVersionUID = 1L; + + // -------------------------------------------------------------------------------------------- + // Attributes that allow vertices to access their in/out degrees and the total number of vertices + // inside an iteration. + // -------------------------------------------------------------------------------------------- + + private long numberOfVertices = -1L; + + /** + * Retrieves the number of vertices in the graph. + * @return the number of vertices if the {@link org.apache.flink.graph.IterationConfiguration#setOptNumVertices(boolean)} + * option has been set; -1 otherwise. + */ + public long getNumberOfVertices() { + return numberOfVertices; + } + + void setNumberOfVertices(long numberOfVertices) { + this.numberOfVertices = numberOfVertices; + } + + // -------------------------------------------------------------------------------------------- + // Attribute that allows the user to choose the neighborhood type(in/out/all) on which to run + // the scatter gather iteration. + // -------------------------------------------------------------------------------------------- + + private EdgeDirection direction; + + /** + * Retrieves the edge direction in which messages are propagated in the scatter-gather iteration. + * @return the messaging {@link EdgeDirection} + */ + public EdgeDirection getDirection() { + return direction; + } + + void setDirection(EdgeDirection direction) { + this.direction = direction; + } + + // -------------------------------------------------------------------------------------------- + // Public API Methods + // -------------------------------------------------------------------------------------------- + + /** + * This method is invoked once per superstep for each vertex that was changed in that superstep. + * It needs to produce the messages that will be received by vertices in the next superstep. + * + * @param vertex The vertex that was changed. + * + * @throws Exception The computation may throw exceptions, which causes the superstep to fail. + */ + public abstract void sendMessages(Vertex<K, VV> vertex) throws Exception; + + /** + * This method is executed once per superstep before the scatter function is invoked for each vertex. + * + * @throws Exception Exceptions in the pre-superstep phase cause the superstep to fail. + */ + public void preSuperstep() throws Exception {} + + /** + * This method is executed once per superstep after the scatter function has been invoked for each vertex. + * + * @throws Exception Exceptions in the post-superstep phase cause the superstep to fail. + */ + public void postSuperstep() throws Exception {} + + + /** + * Gets an {@link java.lang.Iterable} with all edges. This method is mutually exclusive with + * {@link #sendMessageToAllNeighbors(Object)} and may be called only once. + * <p> + * If the {@link EdgeDirection} is OUT (default), then this iterator contains outgoing edges. + * If the {@link EdgeDirection} is IN, then this iterator contains incoming edges. + * If the {@link EdgeDirection} is ALL, then this iterator contains both outgoing and incoming edges. + * + * @return An iterator with all edges. + */ + @SuppressWarnings("unchecked") + public Iterable<Edge<K, EV>> getEdges() { + if (edgesUsed) { + throw new IllegalStateException("Can use either 'getEdges()' or 'sendMessageToAllNeighbors()' exactly once."); + } + edgesUsed = true; + this.edgeIterator.set((Iterator<Edge<K, EV>>) edges); + return this.edgeIterator; + } + + /** + * Sends the given message to all vertices that are targets of an edge of the changed vertex. + * This method is mutually exclusive to the method {@link #getEdges()} and may be called only once. + * <p> + * If the {@link EdgeDirection} is OUT (default), the message will be sent to out-neighbors. + * If the {@link EdgeDirection} is IN, the message will be sent to in-neighbors. + * If the {@link EdgeDirection} is ALL, the message will be sent to all neighbors. + * + * @param m The message to send. + */ + public void sendMessageToAllNeighbors(Message m) { + if (edgesUsed) { + throw new IllegalStateException("Can use either 'getEdges()' or 'sendMessageToAllNeighbors()'" + + "exactly once."); + } + + edgesUsed = true; + outValue.f1 = m; + + while (edges.hasNext()) { + Tuple next = (Tuple) edges.next(); + + /* + * When EdgeDirection is OUT, the edges iterator only has the out-edges + * of the vertex, i.e. the ones where this vertex is src. + * next.getField(1) gives the neighbor of the vertex running this ScatterFunction. + */ + if (getDirection().equals(EdgeDirection.OUT)) { + outValue.f0 = next.getField(1); + } + /* + * When EdgeDirection is IN, the edges iterator only has the in-edges + * of the vertex, i.e. the ones where this vertex is trg. + * next.getField(10) gives the neighbor of the vertex running this ScatterFunction. + */ + else if (getDirection().equals(EdgeDirection.IN)) { + outValue.f0 = next.getField(0); + } + // When EdgeDirection is ALL, the edges iterator contains both in- and out- edges + if (getDirection().equals(EdgeDirection.ALL)) { + if (next.getField(0).equals(vertexId)) { + // send msg to the trg + outValue.f0 = next.getField(1); + } + else { + // send msg to the src + outValue.f0 = next.getField(0); + } + } + out.collect(outValue); + } + } + + /** + * Sends the given message to the vertex identified by the given key. If the target vertex does not exist, + * the next superstep will cause an exception due to a non-deliverable message. + * + * @param target The key (id) of the target vertex to message. + * @param m The message. + */ + public void sendMessageTo(K target, Message m) { + outValue.f0 = target; + outValue.f1 = m; + out.collect(outValue); + } + + // -------------------------------------------------------------------------------------------- + + /** + * Gets the number of the superstep, starting at <tt>1</tt>. + * + * @return The number of the current superstep. + */ + public int getSuperstepNumber() { + return this.runtimeContext.getSuperstepNumber(); + } + + /** + * Gets the iteration aggregator registered under the given name. The iteration aggregator combines + * all aggregates globally once per superstep and makes them available in the next superstep. + * + * @param name The name of the aggregator. + * @return The aggregator registered under this name, or null, if no aggregator was registered. + */ + public <T extends Aggregator<?>> T getIterationAggregator(String name) { + return this.runtimeContext.<T>getIterationAggregator(name); + } + + /** + * Get the aggregated value that an aggregator computed in the previous iteration. + * + * @param name The name of the aggregator. + * @return The aggregated value of the previous iteration. + */ + public <T extends Value> T getPreviousIterationAggregate(String name) { + return this.runtimeContext.<T>getPreviousIterationAggregate(name); + } + + /** + * Gets the broadcast data set registered under the given name. Broadcast data sets + * are available on all parallel instances of a function. They can be registered via + * {@link org.apache.flink.graph.spargel.ScatterGatherConfiguration#addBroadcastSetForScatterFunction(String, org.apache.flink.api.java.DataSet)}. + * + * @param name The name under which the broadcast set is registered. + * @return The broadcast data set. + */ + public <T> Collection<T> getBroadcastSet(String name) { + return this.runtimeContext.<T>getBroadcastVariable(name); + } + + // -------------------------------------------------------------------------------------------- + // internal methods and state + // -------------------------------------------------------------------------------------------- + + private Tuple2<K, Message> outValue; + + private IterationRuntimeContext runtimeContext; + + private Iterator<?> edges; + + private Collector<Tuple2<K, Message>> out; + + private K vertexId; + + private EdgesIterator<K, EV> edgeIterator; + + private boolean edgesUsed; + + private long inDegree = -1; + + private long outDegree = -1; + + void init(IterationRuntimeContext context) { + this.runtimeContext = context; + this.outValue = new Tuple2<K, Message>(); + this.edgeIterator = new EdgesIterator<K, EV>(); + } + + void set(Iterator<?> edges, Collector<Tuple2<K, Message>> out, K id) { + this.edges = edges; + this.out = out; + this.vertexId = id; + this.edgesUsed = false; + } + + private static final class EdgesIterator<K, EV> + implements Iterator<Edge<K, EV>>, Iterable<Edge<K, EV>> + { + private Iterator<Edge<K, EV>> input; + + private Edge<K, EV> edge = new Edge<K, EV>(); + + void set(Iterator<Edge<K, EV>> input) { + this.input = input; + } + + @Override + public boolean hasNext() { + return input.hasNext(); + } + + @Override + public Edge<K, EV> next() { + Edge<K, EV> next = input.next(); + edge.setSource(next.f0); + edge.setTarget(next.f1); + edge.setValue(next.f2); + return edge; + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + @Override + public Iterator<Edge<K, EV>> iterator() { + return this; + } + } + + /** + * Retrieves the vertex in-degree (number of in-coming edges). + * @return The in-degree of this vertex + */ + public long getInDegree() { + return inDegree; + } + + void setInDegree(long inDegree) { + this.inDegree = inDegree; + } + + /** + * Retrieve the vertex out-degree (number of out-going edges). + * @return The out-degree of this vertex + */ + public long getOutDegree() { + return outDegree; + } + + void setOutDegree(long outDegree) { + this.outDegree = outDegree; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/918e5d0c/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/ScatterGatherConfiguration.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/ScatterGatherConfiguration.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/ScatterGatherConfiguration.java index 3a3de64..4ac1ae1 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/ScatterGatherConfiguration.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/ScatterGatherConfiguration.java @@ -29,20 +29,20 @@ import java.util.List; /** * A ScatterGatherConfiguration object can be used to set the iteration name and * degree of parallelism, to register aggregators and use broadcast sets in - * the {@link org.apache.flink.graph.spargel.VertexUpdateFunction} and {@link org.apache.flink.graph.spargel.MessagingFunction} + * the {@link GatherFunction} and {@link ScatterFunction} * * The VertexCentricConfiguration object is passed as an argument to * {@link org.apache.flink.graph.Graph#runScatterGatherIteration ( - * org.apache.flink.graph.spargel.VertexUpdateFunction, org.apache.flink.graph.spargel.MessagingFunction, int, + * org.apache.flink.graph.spargel.GatherFunction, org.apache.flink.graph.spargel.ScatterFunction, int, * ScatterGatherConfiguration)}. */ public class ScatterGatherConfiguration extends IterationConfiguration { - /** the broadcast variables for the update function **/ - private List<Tuple2<String, DataSet<?>>> bcVarsUpdate = new ArrayList<Tuple2<String,DataSet<?>>>(); + /** the broadcast variables for the scatter function **/ + private List<Tuple2<String, DataSet<?>>> bcVarsScatter = new ArrayList<>(); - /** the broadcast variables for the messaging function **/ - private List<Tuple2<String, DataSet<?>>> bcVarsMessaging = new ArrayList<Tuple2<String,DataSet<?>>>(); + /** the broadcast variables for the gather function **/ + private List<Tuple2<String, DataSet<?>>> bcVarsGather = new ArrayList<>(); /** flag that defines whether the degrees option is set **/ private boolean optDegrees = false; @@ -53,43 +53,43 @@ public class ScatterGatherConfiguration extends IterationConfiguration { public ScatterGatherConfiguration() {} /** - * Adds a data set as a broadcast set to the messaging function. + * Adds a data set as a broadcast set to the scatter function. * - * @param name The name under which the broadcast data is available in the messaging function. + * @param name The name under which the broadcast data is available in the scatter function. * @param data The data set to be broadcasted. */ - public void addBroadcastSetForMessagingFunction(String name, DataSet<?> data) { - this.bcVarsMessaging.add(new Tuple2<String, DataSet<?>>(name, data)); + public void addBroadcastSetForScatterFunction(String name, DataSet<?> data) { + this.bcVarsScatter.add(new Tuple2<String, DataSet<?>>(name, data)); } /** - * Adds a data set as a broadcast set to the vertex update function. + * Adds a data set as a broadcast set to the gather function. * - * @param name The name under which the broadcast data is available in the vertex update function. + * @param name The name under which the broadcast data is available in the gather function. * @param data The data set to be broadcasted. */ - public void addBroadcastSetForUpdateFunction(String name, DataSet<?> data) { - this.bcVarsUpdate.add(new Tuple2<String, DataSet<?>>(name, data)); + public void addBroadcastSetForGatherFunction(String name, DataSet<?> data) { + this.bcVarsGather.add(new Tuple2<String, DataSet<?>>(name, data)); } /** - * Get the broadcast variables of the VertexUpdateFunction. + * Get the broadcast variables of the ScatterFunction. * * @return a List of Tuple2, where the first field is the broadcast variable name * and the second field is the broadcast DataSet. */ - public List<Tuple2<String, DataSet<?>>> getUpdateBcastVars() { - return this.bcVarsUpdate; + public List<Tuple2<String, DataSet<?>>> getScatterBcastVars() { + return this.bcVarsScatter; } /** - * Get the broadcast variables of the MessagingFunction. + * Get the broadcast variables of the GatherFunction. * * @return a List of Tuple2, where the first field is the broadcast variable name * and the second field is the broadcast DataSet. */ - public List<Tuple2<String, DataSet<?>>> getMessagingBcastVars() { - return this.bcVarsMessaging; + public List<Tuple2<String, DataSet<?>>> getGatherBcastVars() { + return this.bcVarsGather; } /** @@ -113,7 +113,7 @@ public class ScatterGatherConfiguration extends IterationConfiguration { } /** - * Gets the direction in which messages are sent in the MessagingFunction. + * Gets the direction in which messages are sent in the ScatterFunction. * By default the messaging direction is OUT. * * @return an EdgeDirection, which can be either IN, OUT or ALL. @@ -123,7 +123,7 @@ public class ScatterGatherConfiguration extends IterationConfiguration { } /** - * Sets the direction in which messages are sent in the MessagingFunction. + * Sets the direction in which messages are sent in the ScatterFunction. * By default the messaging direction is OUT. * * @param direction - IN, OUT or ALL http://git-wip-us.apache.org/repos/asf/flink/blob/918e5d0c/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/ScatterGatherIteration.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/ScatterGatherIteration.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/ScatterGatherIteration.java index fc5c210..fde305f 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/ScatterGatherIteration.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/ScatterGatherIteration.java @@ -53,21 +53,21 @@ import java.util.Map; * Scatter-Gather algorithms operate on graphs, which are defined through vertices and edges. The * algorithms send messages along the edges and update the state of vertices based on * the old state and the incoming messages. All vertices have an initial state. - * The computation terminates once no vertex updates it state any more. + * The computation terminates once no vertex updates its state any more. * Additionally, a maximum number of iterations (supersteps) may be specified. * <p> * The computation is here represented by two functions: * <ul> - * <li>The {@link VertexUpdateFunction} receives incoming messages and may updates the state for + * <li>The {@link GatherFunction} receives incoming messages and may updates the state for * the vertex. If a state is updated, messages are sent from this vertex. Initially, all vertices are * considered updated.</li> - * <li>The {@link MessagingFunction} takes the new vertex state and sends messages along the outgoing + * <li>The {@link ScatterFunction} takes the new vertex state and sends messages along the outgoing * edges of the vertex. The outgoing edges may optionally have an associated value, such as a weight.</li> * </ul> * <p> * * Scatter-Gather graph iterations are are run by calling - * {@link Graph#runScatterGatherIteration(VertexUpdateFunction, MessagingFunction, int)}. + * {@link Graph#runScatterGatherIteration(ScatterFunction, GatherFunction, int)}. * * @param <K> The type of the vertex key (the vertex identifier). * @param <VV> The type of the vertex value (the state of the vertex). @@ -77,47 +77,47 @@ import java.util.Map; public class ScatterGatherIteration<K, VV, Message, EV> implements CustomUnaryOperation<Vertex<K, VV>, Vertex<K, VV>> { - private final VertexUpdateFunction<K, VV, Message> updateFunction; + private final ScatterFunction<K, VV, Message, EV> scatterFunction; + + private final GatherFunction<K, VV, Message> gatherFunction; - private final MessagingFunction<K, VV, Message, EV> messagingFunction; - private final DataSet<Edge<K, EV>> edgesWithValue; - + private final int maximumNumberOfIterations; - + private final TypeInformation<Message> messageType; - + private DataSet<Vertex<K, VV>> initialVertices; private ScatterGatherConfiguration configuration; // ---------------------------------------------------------------------------------- - - private ScatterGatherIteration(VertexUpdateFunction<K, VV, Message> uf, - MessagingFunction<K, VV, Message, EV> mf, + + private ScatterGatherIteration(ScatterFunction<K, VV, Message, EV> sf, + GatherFunction<K, VV, Message> gf, DataSet<Edge<K, EV>> edgesWithValue, int maximumNumberOfIterations) { - Preconditions.checkNotNull(uf); - Preconditions.checkNotNull(mf); + Preconditions.checkNotNull(sf); + Preconditions.checkNotNull(gf); Preconditions.checkNotNull(edgesWithValue); Preconditions.checkArgument(maximumNumberOfIterations > 0, "The maximum number of iterations must be at least one."); - this.updateFunction = uf; - this.messagingFunction = mf; + this.scatterFunction = sf; + this.gatherFunction = gf; this.edgesWithValue = edgesWithValue; - this.maximumNumberOfIterations = maximumNumberOfIterations; - this.messageType = getMessageType(mf); + this.maximumNumberOfIterations = maximumNumberOfIterations; + this.messageType = getMessageType(sf); } - - private TypeInformation<Message> getMessageType(MessagingFunction<K, VV, Message, EV> mf) { - return TypeExtractor.createTypeInfo(mf, MessagingFunction.class, mf.getClass(), 2); + + private TypeInformation<Message> getMessageType(ScatterFunction<K, VV, Message, EV> mf) { + return TypeExtractor.createTypeInfo(mf, ScatterFunction.class, mf.getClass(), 2); } - + // -------------------------------------------------------------------------------------------- // Custom Operator behavior // -------------------------------------------------------------------------------------------- - + /** * Sets the input data set for this operator. In the case of this operator this input data set represents * the set of vertices with their initial state. @@ -131,7 +131,7 @@ public class ScatterGatherIteration<K, VV, Message, EV> public void setInput(DataSet<Vertex<K, VV>> inputData) { this.initialVertices = inputData; } - + /** * Creates the operator that represents this scatter-gather graph computation. * @@ -145,14 +145,14 @@ public class ScatterGatherIteration<K, VV, Message, EV> // prepare some type information TypeInformation<K> keyType = ((TupleTypeInfo<?>) initialVertices.getType()).getTypeAt(0); - TypeInformation<Tuple2<K, Message>> messageTypeInfo = new TupleTypeInfo<Tuple2<K,Message>>(keyType, messageType); + TypeInformation<Tuple2<K, Message>> messageTypeInfo = new TupleTypeInfo<>(keyType, messageType); // create a graph Graph<K, VV, EV> graph = Graph.fromDataSet(initialVertices, edgesWithValue, initialVertices.getExecutionEnvironment()); // check whether the numVertices option is set and, if so, compute the total number of vertices - // and set it within the messaging and update functions + // and set it within the scatter and gather functions DataSet<LongValue> numberOfVertices = null; if (this.configuration != null && this.configuration.isOptNumVertices()) { @@ -164,13 +164,13 @@ public class ScatterGatherIteration<K, VV, Message, EV> } if(this.configuration != null) { - messagingFunction.setDirection(this.configuration.getDirection()); + scatterFunction.setDirection(this.configuration.getDirection()); } else { - messagingFunction.setDirection(EdgeDirection.OUT); + scatterFunction.setDirection(EdgeDirection.OUT); } // retrieve the direction in which the updates are made and in which the messages are sent - EdgeDirection messagingDirection = messagingFunction.getDirection(); + EdgeDirection messagingDirection = scatterFunction.getDirection(); // check whether the degrees option is set and, if so, compute the in and the out degrees and // add them to the vertex value @@ -186,9 +186,9 @@ public class ScatterGatherIteration<K, VV, Message, EV> * a weight or distance). * * @param edgesWithValue The data set containing edges. - * @param uf The function that updates the state of the vertices from the incoming messages. - * @param mf The function that turns changed vertex states into messages along the edges. - * + * @param sf The function that turns changed vertex states into messages along the edges. + * @param gf The function that updates the state of the vertices from the incoming messages. + * * @param <K> The type of the vertex key (the vertex identifier). * @param <VV> The type of the vertex value (the state of the vertex). * @param <Message> The type of the message sent between vertices along the edges. @@ -196,14 +196,11 @@ public class ScatterGatherIteration<K, VV, Message, EV> * * @return An in stance of the scatter-gather graph computation operator. */ - public static final <K, VV, Message, EV> - ScatterGatherIteration<K, VV, Message, EV> withEdges( - DataSet<Edge<K, EV>> edgesWithValue, - VertexUpdateFunction<K, VV, Message> uf, - MessagingFunction<K, VV, Message, EV> mf, - int maximumNumberOfIterations) + public static final <K, VV, Message, EV> ScatterGatherIteration<K, VV, Message, EV> withEdges( + DataSet<Edge<K, EV>> edgesWithValue, ScatterFunction<K, VV, Message, EV> sf, + GatherFunction<K, VV, Message> gf, int maximumNumberOfIterations) { - return new ScatterGatherIteration<K, VV, Message, EV>(uf, mf, edgesWithValue, maximumNumberOfIterations); + return new ScatterGatherIteration<>(sf, gf, edgesWithValue, maximumNumberOfIterations); } /** @@ -226,23 +223,122 @@ public class ScatterGatherIteration<K, VV, Message, EV> // Wrapping UDFs // -------------------------------------------------------------------------------------------- - private static abstract class VertexUpdateUdf<K, VVWithDegrees, Message> extends RichCoGroupFunction< + /* + * UDF that encapsulates the message sending function for graphs where the edges have an associated value. + */ + private static abstract class ScatterUdfWithEdgeValues<K, VVWithDegrees, VV, Message, EV> + extends RichCoGroupFunction<Edge<K, EV>, Vertex<K, VVWithDegrees>, Tuple2<K, Message>> + implements ResultTypeQueryable<Tuple2<K, Message>> + { + private static final long serialVersionUID = 1L; + + final ScatterFunction<K, VV, Message, EV> scatterFunction; + + private transient TypeInformation<Tuple2<K, Message>> resultType; + + + private ScatterUdfWithEdgeValues(ScatterFunction<K, VV, Message, EV> scatterFunction, + TypeInformation<Tuple2<K, Message>> resultType) + { + this.scatterFunction = scatterFunction; + this.resultType = resultType; + } + + @Override + public void open(Configuration parameters) throws Exception { + if (getRuntimeContext().hasBroadcastVariable("number of vertices")) { + Collection<LongValue> numberOfVertices = getRuntimeContext().getBroadcastVariable("number of vertices"); + this.scatterFunction.setNumberOfVertices(numberOfVertices.iterator().next().getValue()); + } + if (getIterationRuntimeContext().getSuperstepNumber() == 1) { + this.scatterFunction.init(getIterationRuntimeContext()); + } + this.scatterFunction.preSuperstep(); + } + + @Override + public void close() throws Exception { + this.scatterFunction.postSuperstep(); + } + + @Override + public TypeInformation<Tuple2<K, Message>> getProducedType() { + return this.resultType; + } + } + + @SuppressWarnings("serial") + private static final class ScatterUdfWithEVsSimpleVV<K, VV, Message, EV> + extends ScatterUdfWithEdgeValues<K, VV, VV, Message, EV> { + + private ScatterUdfWithEVsSimpleVV(ScatterFunction<K, VV, Message, EV> scatterFunction, + TypeInformation<Tuple2<K, Message>> resultType) { + super(scatterFunction, resultType); + } + + @Override + public void coGroup(Iterable<Edge<K, EV>> edges, + Iterable<Vertex<K, VV>> state, + Collector<Tuple2<K, Message>> out) throws Exception { + final Iterator<Vertex<K, VV>> stateIter = state.iterator(); + + if (stateIter.hasNext()) { + Vertex<K, VV> newVertexState = stateIter.next(); + scatterFunction.set(edges.iterator(), out, newVertexState.getId()); + scatterFunction.sendMessages(newVertexState); + } + } + } + + @SuppressWarnings("serial") + private static final class ScatterUdfWithEVsVVWithDegrees<K, VV, Message, EV> + extends ScatterUdfWithEdgeValues<K, Tuple3<VV, LongValue, LongValue>, VV, Message, EV> { + + private Vertex<K, VV> nextVertex = new Vertex<>(); + + private ScatterUdfWithEVsVVWithDegrees(ScatterFunction<K, VV, Message, EV> scatterFunction, + TypeInformation<Tuple2<K, Message>> resultType) { + super(scatterFunction, resultType); + } + + @Override + public void coGroup(Iterable<Edge<K, EV>> edges, Iterable<Vertex<K, Tuple3<VV, LongValue, LongValue>>> state, + Collector<Tuple2<K, Message>> out) throws Exception { + + final Iterator<Vertex<K, Tuple3<VV, LongValue, LongValue>>> stateIter = state.iterator(); + + if (stateIter.hasNext()) { + Vertex<K, Tuple3<VV, LongValue, LongValue>> vertexWithDegrees = stateIter.next(); + + nextVertex.setField(vertexWithDegrees.f0, 0); + nextVertex.setField(vertexWithDegrees.f1.f0, 1); + + scatterFunction.setInDegree(vertexWithDegrees.f1.f1.getValue()); + scatterFunction.setOutDegree(vertexWithDegrees.f1.f2.getValue()); + + scatterFunction.set(edges.iterator(), out, vertexWithDegrees.getId()); + scatterFunction.sendMessages(nextVertex); + } + } + } + + private static abstract class GatherUdf<K, VVWithDegrees, Message> extends RichCoGroupFunction< Tuple2<K, Message>, Vertex<K, VVWithDegrees>, Vertex<K, VVWithDegrees>> implements ResultTypeQueryable<Vertex<K, VVWithDegrees>> { private static final long serialVersionUID = 1L; - - final VertexUpdateFunction<K, VVWithDegrees, Message> vertexUpdateFunction; - final MessageIterator<Message> messageIter = new MessageIterator<Message>(); - + final GatherFunction<K, VVWithDegrees, Message> gatherFunction; + + final MessageIterator<Message> messageIter = new MessageIterator<>(); + private transient TypeInformation<Vertex<K, VVWithDegrees>> resultType; - - - private VertexUpdateUdf(VertexUpdateFunction<K, VVWithDegrees, Message> vertexUpdateFunction, + + + private GatherUdf(GatherFunction<K, VVWithDegrees, Message> gatherFunction, TypeInformation<Vertex<K, VVWithDegrees>> resultType) { - this.vertexUpdateFunction = vertexUpdateFunction; + this.gatherFunction = gatherFunction; this.resultType = resultType; } @@ -250,17 +346,17 @@ public class ScatterGatherIteration<K, VV, Message, EV> public void open(Configuration parameters) throws Exception { if (getRuntimeContext().hasBroadcastVariable("number of vertices")) { Collection<LongValue> numberOfVertices = getRuntimeContext().getBroadcastVariable("number of vertices"); - this.vertexUpdateFunction.setNumberOfVertices(numberOfVertices.iterator().next().getValue()); + this.gatherFunction.setNumberOfVertices(numberOfVertices.iterator().next().getValue()); } if (getIterationRuntimeContext().getSuperstepNumber() == 1) { - this.vertexUpdateFunction.init(getIterationRuntimeContext()); + this.gatherFunction.init(getIterationRuntimeContext()); } - this.vertexUpdateFunction.preSuperstep(); + this.gatherFunction.preSuperstep(); } - + @Override public void close() throws Exception { - this.vertexUpdateFunction.postSuperstep(); + this.gatherFunction.postSuperstep(); } @Override @@ -270,10 +366,10 @@ public class ScatterGatherIteration<K, VV, Message, EV> } @SuppressWarnings("serial") - private static final class VertexUpdateUdfSimpleVV<K, VV, Message> extends VertexUpdateUdf<K, VV, Message> { + private static final class GatherUdfSimpleVV<K, VV, Message> extends GatherUdf<K, VV, Message> { - private VertexUpdateUdfSimpleVV(VertexUpdateFunction<K, VV, Message> vertexUpdateFunction, TypeInformation<Vertex<K, VV>> resultType) { - super(vertexUpdateFunction, resultType); + private GatherUdfSimpleVV(GatherFunction<K, VV, Message> gatherFunction, TypeInformation<Vertex<K, VV>> resultType) { + super(gatherFunction, resultType); } @Override @@ -289,8 +385,8 @@ public class ScatterGatherIteration<K, VV, Message, EV> Iterator<Tuple2<?, Message>> downcastIter = (Iterator<Tuple2<?, Message>>) (Iterator<?>) messages.iterator(); messageIter.setSource(downcastIter); - vertexUpdateFunction.setOutput(vertexState, out); - vertexUpdateFunction.updateVertex(vertexState, messageIter); + gatherFunction.setOutput(vertexState, out); + gatherFunction.updateVertex(vertexState, messageIter); } else { final Iterator<Tuple2<K, Message>> messageIter = messages.iterator(); @@ -299,7 +395,7 @@ public class ScatterGatherIteration<K, VV, Message, EV> try { Tuple2<K, Message> next = messageIter.next(); message = "Target vertex '" + next.f0 + "' does not exist!."; - } catch (Throwable t) {} + } catch (Throwable ignored) {} throw new Exception(message); } else { throw new Exception(); @@ -309,31 +405,31 @@ public class ScatterGatherIteration<K, VV, Message, EV> } @SuppressWarnings("serial") - private static final class VertexUpdateUdfVVWithDegrees<K, VV, Message> extends VertexUpdateUdf<K, Tuple3<VV, LongValue, LongValue>, Message> { + private static final class GatherUdfVVWithDegrees<K, VV, Message> extends GatherUdf<K, Tuple3<VV, LongValue, LongValue>, Message> { - private VertexUpdateUdfVVWithDegrees(VertexUpdateFunction<K, Tuple3<VV, LongValue, LongValue>, Message> vertexUpdateFunction, + private GatherUdfVVWithDegrees(GatherFunction<K, Tuple3<VV, LongValue, LongValue>, Message> gatherFunction, TypeInformation<Vertex<K, Tuple3<VV, LongValue, LongValue>>> resultType) { - super(vertexUpdateFunction, resultType); + super(gatherFunction, resultType); } - + @Override public void coGroup(Iterable<Tuple2<K, Message>> messages, Iterable<Vertex<K, Tuple3<VV, LongValue, LongValue>>> vertex, Collector<Vertex<K, Tuple3<VV, LongValue, LongValue>>> out) throws Exception { final Iterator<Vertex<K, Tuple3<VV, LongValue, LongValue>>> vertexIter = vertex.iterator(); - + if (vertexIter.hasNext()) { Vertex<K, Tuple3<VV, LongValue, LongValue>> vertexWithDegrees = vertexIter.next(); - + @SuppressWarnings("unchecked") Iterator<Tuple2<?, Message>> downcastIter = (Iterator<Tuple2<?, Message>>) (Iterator<?>) messages.iterator(); messageIter.setSource(downcastIter); - vertexUpdateFunction.setInDegree(vertexWithDegrees.f1.f1.getValue()); - vertexUpdateFunction.setOutDegree(vertexWithDegrees.f1.f2.getValue()); + gatherFunction.setInDegree(vertexWithDegrees.f1.f1.getValue()); + gatherFunction.setOutDegree(vertexWithDegrees.f1.f2.getValue()); - vertexUpdateFunction.setOutputWithDegrees(vertexWithDegrees, out); - vertexUpdateFunction.updateVertexFromScatterGatherIteration(vertexWithDegrees, messageIter); + gatherFunction.setOutputWithDegrees(vertexWithDegrees, out); + gatherFunction.updateVertexFromScatterGatherIteration(vertexWithDegrees, messageIter); } else { final Iterator<Tuple2<K, Message>> messageIter = messages.iterator(); @@ -342,7 +438,7 @@ public class ScatterGatherIteration<K, VV, Message, EV> try { Tuple2<K, Message> next = messageIter.next(); message = "Target vertex '" + next.f0 + "' does not exist!."; - } catch (Throwable t) {} + } catch (Throwable ignored) {} throw new Exception(message); } else { throw new Exception(); @@ -351,112 +447,12 @@ public class ScatterGatherIteration<K, VV, Message, EV> } } - /* - * UDF that encapsulates the message sending function for graphs where the edges have an associated value. - */ - private static abstract class MessagingUdfWithEdgeValues<K, VVWithDegrees, VV, Message, EV> - extends RichCoGroupFunction<Edge<K, EV>, Vertex<K, VVWithDegrees>, Tuple2<K, Message>> - implements ResultTypeQueryable<Tuple2<K, Message>> - { - private static final long serialVersionUID = 1L; - - final MessagingFunction<K, VV, Message, EV> messagingFunction; - - private transient TypeInformation<Tuple2<K, Message>> resultType; - - - private MessagingUdfWithEdgeValues(MessagingFunction<K, VV, Message, EV> messagingFunction, - TypeInformation<Tuple2<K, Message>> resultType) - { - this.messagingFunction = messagingFunction; - this.resultType = resultType; - } - - @Override - public void open(Configuration parameters) throws Exception { - if (getRuntimeContext().hasBroadcastVariable("number of vertices")) { - Collection<LongValue> numberOfVertices = getRuntimeContext().getBroadcastVariable("number of vertices"); - this.messagingFunction.setNumberOfVertices(numberOfVertices.iterator().next().getValue()); - } - if (getIterationRuntimeContext().getSuperstepNumber() == 1) { - this.messagingFunction.init(getIterationRuntimeContext()); - } - this.messagingFunction.preSuperstep(); - } - - @Override - public void close() throws Exception { - this.messagingFunction.postSuperstep(); - } - - @Override - public TypeInformation<Tuple2<K, Message>> getProducedType() { - return this.resultType; - } - } - - @SuppressWarnings("serial") - private static final class MessagingUdfWithEVsSimpleVV<K, VV, Message, EV> - extends MessagingUdfWithEdgeValues<K, VV, VV, Message, EV> { - - private MessagingUdfWithEVsSimpleVV(MessagingFunction<K, VV, Message, EV> messagingFunction, - TypeInformation<Tuple2<K, Message>> resultType) { - super(messagingFunction, resultType); - } - - @Override - public void coGroup(Iterable<Edge<K, EV>> edges, - Iterable<Vertex<K, VV>> state, - Collector<Tuple2<K, Message>> out) throws Exception { - final Iterator<Vertex<K, VV>> stateIter = state.iterator(); - - if (stateIter.hasNext()) { - Vertex<K, VV> newVertexState = stateIter.next(); - messagingFunction.set((Iterator<?>) edges.iterator(), out, newVertexState.getId()); - messagingFunction.sendMessages(newVertexState); - } - } - } - - @SuppressWarnings("serial") - private static final class MessagingUdfWithEVsVVWithDegrees<K, VV, Message, EV> - extends MessagingUdfWithEdgeValues<K, Tuple3<VV, LongValue, LongValue>, VV, Message, EV> { - - private Vertex<K, VV> nextVertex = new Vertex<K, VV>(); - - private MessagingUdfWithEVsVVWithDegrees(MessagingFunction<K, VV, Message, EV> messagingFunction, - TypeInformation<Tuple2<K, Message>> resultType) { - super(messagingFunction, resultType); - } - - @Override - public void coGroup(Iterable<Edge<K, EV>> edges, Iterable<Vertex<K, Tuple3<VV, LongValue, LongValue>>> state, - Collector<Tuple2<K, Message>> out) throws Exception { - - final Iterator<Vertex<K, Tuple3<VV, LongValue, LongValue>>> stateIter = state.iterator(); - - if (stateIter.hasNext()) { - Vertex<K, Tuple3<VV, LongValue, LongValue>> vertexWithDegrees = stateIter.next(); - - nextVertex.setField(vertexWithDegrees.f0, 0); - nextVertex.setField(vertexWithDegrees.f1.f0, 1); - - messagingFunction.setInDegree(vertexWithDegrees.f1.f1.getValue()); - messagingFunction.setOutDegree(vertexWithDegrees.f1.f2.getValue()); - - messagingFunction.set((Iterator<?>) edges.iterator(), out, vertexWithDegrees.getId()); - messagingFunction.sendMessages(nextVertex); - } - } - } - - // -------------------------------------------------------------------------------------------- // UTIL methods // -------------------------------------------------------------------------------------------- /** - * Method that builds the messaging function using a coGroup operator for a simple vertex(without + * Method that builds the scatter function using a coGroup operator for a simple vertex (without * degrees). * It afterwards configures the function with a custom name and broadcast variables. * @@ -464,17 +460,17 @@ public class ScatterGatherIteration<K, VV, Message, EV> * @param messageTypeInfo * @param whereArg the argument for the where within the coGroup * @param equalToArg the argument for the equalTo within the coGroup - * @return the messaging function + * @return the scatter function */ - private CoGroupOperator<?, ?, Tuple2<K, Message>> buildMessagingFunction( + private CoGroupOperator<?, ?, Tuple2<K, Message>> buildScatterFunction( DeltaIteration<Vertex<K, VV>, Vertex<K, VV>> iteration, TypeInformation<Tuple2<K, Message>> messageTypeInfo, int whereArg, int equalToArg, DataSet<LongValue> numberOfVertices) { - // build the messaging function (co group) + // build the scatter function (co group) CoGroupOperator<?, ?, Tuple2<K, Message>> messages; - MessagingUdfWithEdgeValues<K, VV, VV, Message, EV> messenger = - new MessagingUdfWithEVsSimpleVV<K, VV, Message, EV>(messagingFunction, messageTypeInfo); + ScatterUdfWithEdgeValues<K, VV, VV, Message, EV> messenger = + new ScatterUdfWithEVsSimpleVV<>(scatterFunction, messageTypeInfo); messages = this.edgesWithValue.coGroup(iteration.getWorkset()).where(whereArg) .equalTo(equalToArg).with(messenger); @@ -482,7 +478,7 @@ public class ScatterGatherIteration<K, VV, Message, EV> // configure coGroup message function with name and broadcast variables messages = messages.name("Messaging"); if(this.configuration != null) { - for (Tuple2<String, DataSet<?>> e : this.configuration.getMessagingBcastVars()) { + for (Tuple2<String, DataSet<?>> e : this.configuration.getScatterBcastVars()) { messages = messages.withBroadcastSet(e.f1, e.f0); } if (this.configuration.isOptNumVertices()) { @@ -494,7 +490,7 @@ public class ScatterGatherIteration<K, VV, Message, EV> } /** - * Method that builds the messaging function using a coGroup operator for a vertex + * Method that builds the scatter function using a coGroup operator for a vertex * containing degree information. * It afterwards configures the function with a custom name and broadcast variables. * @@ -502,17 +498,17 @@ public class ScatterGatherIteration<K, VV, Message, EV> * @param messageTypeInfo * @param whereArg the argument for the where within the coGroup * @param equalToArg the argument for the equalTo within the coGroup - * @return the messaging function + * @return the scatter function */ - private CoGroupOperator<?, ?, Tuple2<K, Message>> buildMessagingFunctionVerticesWithDegrees( + private CoGroupOperator<?, ?, Tuple2<K, Message>> buildScatterFunctionVerticesWithDegrees( DeltaIteration<Vertex<K, Tuple3<VV, LongValue, LongValue>>, Vertex<K, Tuple3<VV, LongValue, LongValue>>> iteration, TypeInformation<Tuple2<K, Message>> messageTypeInfo, int whereArg, int equalToArg, DataSet<LongValue> numberOfVertices) { - // build the messaging function (co group) + // build the scatter function (co group) CoGroupOperator<?, ?, Tuple2<K, Message>> messages; - MessagingUdfWithEdgeValues<K, Tuple3<VV, LongValue, LongValue>, VV, Message, EV> messenger = - new MessagingUdfWithEVsVVWithDegrees<K, VV, Message, EV>(messagingFunction, messageTypeInfo); + ScatterUdfWithEdgeValues<K, Tuple3<VV, LongValue, LongValue>, VV, Message, EV> messenger = + new ScatterUdfWithEVsVVWithDegrees<>(scatterFunction, messageTypeInfo); messages = this.edgesWithValue.coGroup(iteration.getWorkset()).where(whereArg) .equalTo(equalToArg).with(messenger); @@ -521,7 +517,7 @@ public class ScatterGatherIteration<K, VV, Message, EV> messages = messages.name("Messaging"); if (this.configuration != null) { - for (Tuple2<String, DataSet<?>> e : this.configuration.getMessagingBcastVars()) { + for (Tuple2<String, DataSet<?>> e : this.configuration.getScatterBcastVars()) { messages = messages.withBroadcastSet(e.f1, e.f0); } if (this.configuration.isOptNumVertices()) { @@ -543,7 +539,7 @@ public class ScatterGatherIteration<K, VV, Message, EV> // set up the iteration operator if (this.configuration != null) { - iteration.name(this.configuration.getName("Scatter-gather iteration (" + updateFunction + " | " + messagingFunction + ")")); + iteration.name(this.configuration.getName("Scatter-gather iteration (" + gatherFunction + " | " + scatterFunction + ")")); iteration.parallelism(this.configuration.getParallelism()); iteration.setSolutionSetUnManaged(this.configuration.isSolutionSetUnmanagedMemory()); @@ -554,7 +550,7 @@ public class ScatterGatherIteration<K, VV, Message, EV> } else { // no configuration provided; set default name - iteration.name("Scatter-gather iteration (" + updateFunction + " | " + messagingFunction + ")"); + iteration.name("Scatter-gather iteration (" + gatherFunction + " | " + scatterFunction + ")"); } } @@ -579,21 +575,21 @@ public class ScatterGatherIteration<K, VV, Message, EV> switch (messagingDirection) { case IN: - messages = buildMessagingFunction(iteration, messageTypeInfo, 1, 0, numberOfVertices); + messages = buildScatterFunction(iteration, messageTypeInfo, 1, 0, numberOfVertices); break; case OUT: - messages = buildMessagingFunction(iteration, messageTypeInfo, 0, 0, numberOfVertices); + messages = buildScatterFunction(iteration, messageTypeInfo, 0, 0, numberOfVertices); break; case ALL: - messages = buildMessagingFunction(iteration, messageTypeInfo, 1, 0, numberOfVertices) - .union(buildMessagingFunction(iteration, messageTypeInfo, 0, 0, numberOfVertices)) ; + messages = buildScatterFunction(iteration, messageTypeInfo, 1, 0, numberOfVertices) + .union(buildScatterFunction(iteration, messageTypeInfo, 0, 0, numberOfVertices)) ; break; default: throw new IllegalArgumentException("Illegal edge direction"); } - VertexUpdateUdf<K, VV, Message> updateUdf = - new VertexUpdateUdfSimpleVV<K, VV, Message>(updateFunction, vertexTypes); + GatherUdf<K, VV, Message> updateUdf = + new GatherUdfSimpleVV<K, VV, Message>(gatherFunction, vertexTypes); // build the update function (co group) CoGroupOperator<?, ?, Vertex<K, VV>> updates = @@ -624,7 +620,7 @@ public class ScatterGatherIteration<K, VV, Message, EV> DataSet<Tuple2<K, Message>> messages; - this.updateFunction.setOptDegrees(this.configuration.isOptDegrees()); + this.gatherFunction.setOptDegrees(this.configuration.isOptDegrees()); DataSet<Tuple2<K, LongValue>> inDegrees = graph.inDegrees(); DataSet<Tuple2<K, LongValue>> outDegrees = graph.outDegrees(); @@ -634,7 +630,7 @@ public class ScatterGatherIteration<K, VV, Message, EV> @Override public void join(Tuple2<K, LongValue> first, Tuple2<K, LongValue> second, Collector<Tuple3<K, LongValue, LongValue>> out) { - out.collect(new Tuple3<K, LongValue, LongValue>(first.f0, first.f1, second.f1)); + out.collect(new Tuple3<>(first.f0, first.f1, second.f1)); } }).withForwardedFieldsFirst("f0;f1").withForwardedFieldsSecond("f1"); @@ -644,9 +640,8 @@ public class ScatterGatherIteration<K, VV, Message, EV> @Override public void join(Vertex<K, VV> vertex, Tuple3<K, LongValue, LongValue> degrees, Collector<Vertex<K, Tuple3<VV, LongValue, LongValue>>> out) throws Exception { - - out.collect(new Vertex<K, Tuple3<VV, LongValue, LongValue>>(vertex.getId(), - new Tuple3<VV, LongValue, LongValue>(vertex.getValue(), degrees.f1, degrees.f2))); + out.collect(new Vertex<>(vertex.getId(), + new Tuple3<>(vertex.getValue(), degrees.f1, degrees.f2))); } }).withForwardedFieldsFirst("f0"); @@ -659,22 +654,22 @@ public class ScatterGatherIteration<K, VV, Message, EV> switch (messagingDirection) { case IN: - messages = buildMessagingFunctionVerticesWithDegrees(iteration, messageTypeInfo, 1, 0, numberOfVertices); + messages = buildScatterFunctionVerticesWithDegrees(iteration, messageTypeInfo, 1, 0, numberOfVertices); break; case OUT: - messages = buildMessagingFunctionVerticesWithDegrees(iteration, messageTypeInfo, 0, 0, numberOfVertices); + messages = buildScatterFunctionVerticesWithDegrees(iteration, messageTypeInfo, 0, 0, numberOfVertices); break; case ALL: - messages = buildMessagingFunctionVerticesWithDegrees(iteration, messageTypeInfo, 1, 0, numberOfVertices) - .union(buildMessagingFunctionVerticesWithDegrees(iteration, messageTypeInfo, 0, 0, numberOfVertices)) ; + messages = buildScatterFunctionVerticesWithDegrees(iteration, messageTypeInfo, 1, 0, numberOfVertices) + .union(buildScatterFunctionVerticesWithDegrees(iteration, messageTypeInfo, 0, 0, numberOfVertices)) ; break; default: throw new IllegalArgumentException("Illegal edge direction"); } @SuppressWarnings({ "unchecked", "rawtypes" }) - VertexUpdateUdf<K, Tuple3<VV, LongValue, LongValue>, Message> updateUdf = - new VertexUpdateUdfVVWithDegrees(updateFunction, vertexTypes); + GatherUdf<K, Tuple3<VV, LongValue, LongValue>, Message> updateUdf = + new GatherUdfVVWithDegrees(gatherFunction, vertexTypes); // build the update function (co group) CoGroupOperator<?, ?, Vertex<K, Tuple3<VV, LongValue, LongValue>>> updates = @@ -690,7 +685,7 @@ public class ScatterGatherIteration<K, VV, Message, EV> new MapFunction<Vertex<K, Tuple3<VV, LongValue, LongValue>>, Vertex<K, VV>>() { public Vertex<K, VV> map(Vertex<K, Tuple3<VV, LongValue, LongValue>> vertex) { - return new Vertex<K, VV>(vertex.getId(), vertex.getValue().f0); + return new Vertex<>(vertex.getId(), vertex.getValue().f0); } }); } @@ -700,7 +695,7 @@ public class ScatterGatherIteration<K, VV, Message, EV> // configure coGroup update function with name and broadcast variables updates = updates.name("Vertex State Updates"); if (this.configuration != null) { - for (Tuple2<String, DataSet<?>> e : this.configuration.getUpdateBcastVars()) { + for (Tuple2<String, DataSet<?>> e : this.configuration.getGatherBcastVars()) { updates = updates.withBroadcastSet(e.f1, e.f0); } } http://git-wip-us.apache.org/repos/asf/flink/blob/918e5d0c/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexUpdateFunction.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexUpdateFunction.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexUpdateFunction.java deleted file mode 100644 index 9085432..0000000 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexUpdateFunction.java +++ /dev/null @@ -1,251 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.graph.spargel; - -import java.io.Serializable; -import java.util.Collection; - -import org.apache.flink.api.common.aggregators.Aggregator; -import org.apache.flink.api.common.functions.IterationRuntimeContext; -import org.apache.flink.api.java.tuple.Tuple3; -import org.apache.flink.graph.Vertex; -import org.apache.flink.types.Value; -import org.apache.flink.util.Collector; - -/** - * This class must be extended by functions that compute the state of the vertex depending on the old state and the - * incoming messages. The central method is {@link #updateVertex(Vertex, MessageIterator)}, which is - * invoked once per vertex per superstep. - * - * {@code <K>} The vertex key type. - * {@code <VV>} The vertex value type. - * {@code <Message>} The message type. - */ -public abstract class VertexUpdateFunction<K, VV, Message> implements Serializable { - - private static final long serialVersionUID = 1L; - - // -------------------------------------------------------------------------------------------- - // Attributes that allow vertices to access their in/out degrees and the total number of vertices - // inside an iteration. - // -------------------------------------------------------------------------------------------- - - private long numberOfVertices = -1L; - - /** - * Retrieves the number of vertices in the graph. - * @return the number of vertices if the {@link org.apache.flink.graph.IterationConfiguration#setOptNumVertices(boolean)} - * option has been set; -1 otherwise. - */ - public long getNumberOfVertices() { - return numberOfVertices; - } - - void setNumberOfVertices(long numberOfVertices) { - this.numberOfVertices = numberOfVertices; - } - - //--------------------------------------------------------------------------------------------- - - private boolean optDegrees; - - boolean isOptDegrees() { - return optDegrees; - } - - void setOptDegrees(boolean optDegrees) { - this.optDegrees = optDegrees; - } - - // -------------------------------------------------------------------------------------------- - // Public API Methods - // -------------------------------------------------------------------------------------------- - - /** - * This method is invoked once per vertex per superstep. It receives the current state of the vertex, as well as - * the incoming messages. It may set a new vertex state via {@link #setNewVertexValue(Object)}. If the vertex - * state is changed, it will trigger the sending of messages via the {@link MessagingFunction}. - * - * @param vertex The vertex. - * @param inMessages The incoming messages to this vertex. - * - * @throws Exception The computation may throw exceptions, which causes the superstep to fail. - */ - public abstract void updateVertex(Vertex<K, VV> vertex, MessageIterator<Message> inMessages) throws Exception; - - /** - * This method is executed one per superstep before the vertex update function is invoked for each vertex. - * - * @throws Exception Exceptions in the pre-superstep phase cause the superstep to fail. - */ - public void preSuperstep() throws Exception {} - - /** - * This method is executed one per superstep after the vertex update function has been invoked for each vertex. - * - * @throws Exception Exceptions in the post-superstep phase cause the superstep to fail. - */ - public void postSuperstep() throws Exception {} - - /** - * Sets the new value of this vertex. Setting a new value triggers the sending of outgoing messages from this vertex. - * - * This should be called at most once per updateVertex. - * - * @param newValue The new vertex value. - */ - public void setNewVertexValue(VV newValue) { - if(setNewVertexValueCalled) { - throw new IllegalStateException("setNewVertexValue should only be called at most once per updateVertex"); - } - setNewVertexValueCalled = true; - if(isOptDegrees()) { - outValWithDegrees.f1.f0 = newValue; - outWithDegrees.collect(outValWithDegrees); - } else { - outVal.setValue(newValue); - out.collect(outVal); - } - } - - /** - * Gets the number of the superstep, starting at <tt>1</tt>. - * - * @return The number of the current superstep. - */ - public int getSuperstepNumber() { - return this.runtimeContext.getSuperstepNumber(); - } - - /** - * Gets the iteration aggregator registered under the given name. The iteration aggregator combines - * all aggregates globally once per superstep and makes them available in the next superstep. - * - * @param name The name of the aggregator. - * @return The aggregator registered under this name, or null, if no aggregator was registered. - */ - public <T extends Aggregator<?>> T getIterationAggregator(String name) { - return this.runtimeContext.<T>getIterationAggregator(name); - } - - /** - * Get the aggregated value that an aggregator computed in the previous iteration. - * - * @param name The name of the aggregator. - * @return The aggregated value of the previous iteration. - */ - public <T extends Value> T getPreviousIterationAggregate(String name) { - return this.runtimeContext.<T>getPreviousIterationAggregate(name); - } - - /** - * Gets the broadcast data set registered under the given name. Broadcast data sets - * are available on all parallel instances of a function. They can be registered via - * {@link org.apache.flink.graph.spargel.ScatterGatherConfiguration#addBroadcastSetForUpdateFunction(String, org.apache.flink.api.java.DataSet)}. - * - * @param name The name under which the broadcast set is registered. - * @return The broadcast data set. - */ - public <T> Collection<T> getBroadcastSet(String name) { - return this.runtimeContext.<T>getBroadcastVariable(name); - } - - // -------------------------------------------------------------------------------------------- - // internal methods - // -------------------------------------------------------------------------------------------- - - private IterationRuntimeContext runtimeContext; - - private Collector<Vertex<K, VV>> out; - - private Collector<Vertex<K, Tuple3<VV, Long, Long>>> outWithDegrees; - - private Vertex<K, VV> outVal; - - private Vertex<K, Tuple3<VV, Long, Long>> outValWithDegrees; - - private long inDegree = -1; - - private long outDegree = -1; - - private boolean setNewVertexValueCalled; - - void init(IterationRuntimeContext context) { - this.runtimeContext = context; - } - - void setOutput(Vertex<K, VV> outVal, Collector<Vertex<K, VV>> out) { - this.outVal = outVal; - this.out = out; - setNewVertexValueCalled = false; - } - - @SuppressWarnings({ "unchecked", "rawtypes" }) - <ValueWithDegree> void setOutputWithDegrees(Vertex<K, ValueWithDegree> outVal, - Collector out) { - this.outValWithDegrees = (Vertex<K, Tuple3<VV, Long, Long>>) outVal; - this.outWithDegrees = out; - setNewVertexValueCalled = false; - } - - /** - * Retrieves the vertex in-degree (number of in-coming edges). - * @return The in-degree of this vertex - */ - public long getInDegree() { - return inDegree; - } - - void setInDegree(long inDegree) { - this.inDegree = inDegree; - } - - /** - * Retrieve the vertex out-degree (number of out-going edges). - * @return The out-degree of this vertex - */ - public long getOutDegree() { - return outDegree; - } - - void setOutDegree(long outDegree) { - this.outDegree = outDegree; - } - - /** - * In order to hide the Tuple3(actualValue, inDegree, OutDegree) vertex value from the user, - * another function will be called from {@link org.apache.flink.graph.spargel.ScatterGatherIteration}. - * - * This function will retrieve the vertex from the vertexState and will set its degrees, afterwards calling - * the regular updateVertex function. - * - * @param vertexState - * @param inMessages - * @throws Exception - */ - @SuppressWarnings("unchecked") - <VertexWithDegree> void updateVertexFromScatterGatherIteration(Vertex<K, VertexWithDegree> vertexState, - MessageIterator<Message> inMessages) throws Exception { - - Vertex<K, VV> vertex = new Vertex<K, VV>(vertexState.f0, - ((Tuple3<VV, Long, Long>)vertexState.getValue()).f0); - - updateVertex(vertex, inMessages); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/918e5d0c/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/spargel/SpargelCompilerTest.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/spargel/SpargelCompilerTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/spargel/SpargelCompilerTest.java index 3a750af..14c2fb4 100644 --- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/spargel/SpargelCompilerTest.java +++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/spargel/SpargelCompilerTest.java @@ -75,8 +75,8 @@ public class SpargelCompilerTest extends CompilerTestBase { Graph<Long, Long, NullValue> graph = Graph.fromDataSet(initialVertices, edges, env); DataSet<Vertex<Long, Long>> result = graph.runScatterGatherIteration( - new ConnectedComponents.CCUpdater<Long, Long>(), - new ConnectedComponents.CCMessenger<Long, Long>(BasicTypeInfo.LONG_TYPE_INFO), 100) + new ConnectedComponents.CCMessenger<Long, Long>(BasicTypeInfo.LONG_TYPE_INFO), + new ConnectedComponents.CCUpdater<Long, Long>(), 100) .getVertices(); result.output(new DiscardingOutputFormat<Vertex<Long, Long>>()); @@ -157,12 +157,12 @@ public class SpargelCompilerTest extends CompilerTestBase { Graph<Long, Long, NullValue> graph = Graph.fromDataSet(initialVertices, edges, env); ScatterGatherConfiguration parameters = new ScatterGatherConfiguration(); - parameters.addBroadcastSetForMessagingFunction(BC_VAR_NAME, bcVar); - parameters.addBroadcastSetForUpdateFunction(BC_VAR_NAME, bcVar); + parameters.addBroadcastSetForScatterFunction(BC_VAR_NAME, bcVar); + parameters.addBroadcastSetForGatherFunction(BC_VAR_NAME, bcVar); DataSet<Vertex<Long, Long>> result = graph.runScatterGatherIteration( - new ConnectedComponents.CCUpdater<Long, Long>(), - new ConnectedComponents.CCMessenger<Long, Long>(BasicTypeInfo.LONG_TYPE_INFO), 100) + new ConnectedComponents.CCMessenger<Long, Long>(BasicTypeInfo.LONG_TYPE_INFO), + new ConnectedComponents.CCUpdater<Long, Long>(), 100) .getVertices(); result.output(new DiscardingOutputFormat<Vertex<Long, Long>>());