[FLINK-1201] [gelly] changed spargel classes to work with Vertex and Edge types
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d45c0491 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d45c0491 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d45c0491 Branch: refs/heads/master Commit: d45c049167ae30ae42fc63a46d9da29df2369652 Parents: b0b1295 Author: vasia <vasilikikala...@gmail.com> Authored: Thu Jan 8 20:29:14 2015 +0100 Committer: Stephan Ewen <se...@apache.org> Committed: Wed Feb 11 10:46:14 2015 +0100 ---------------------------------------------------------------------- .../main/java/org/apache/flink/graph/Graph.java | 10 +- .../apache/flink/graph/library/PageRank.java | 7 +- .../library/SingleSourceShortestPaths.java | 5 +- .../flink/graph/spargel/MessagingFunction.java | 89 ++----- .../flink/graph/spargel/OutgoingEdge.java | 64 ----- .../graph/spargel/VertexCentricIteration.java | 237 ++++--------------- .../graph/spargel/VertexUpdateFunction.java | 12 +- 7 files changed, 83 insertions(+), 341 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/d45c0491/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java index 247f5fc..425a377 100644 --- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java +++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java @@ -53,7 +53,6 @@ import flink.graphs.spargel.MessagingFunction; import flink.graphs.spargel.VertexCentricIteration; import flink.graphs.spargel.VertexUpdateFunction; import flink.graphs.utils.GraphUtils; -import flink.graphs.utils.Tuple2ToVertexMap; import flink.graphs.validation.GraphValidator; /** @@ -1058,15 +1057,12 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab * @param maximumNumberOfIterations maximum number of iterations to perform * @return */ - @SuppressWarnings("unchecked") public <M>Graph<K, VV, EV> runVertexCentricIteration(VertexUpdateFunction<K, VV, M> vertexUpdateFunction, MessagingFunction<K, VV, M, EV> messagingFunction, int maximumNumberOfIterations) { - DataSet<Tuple2<K, VV>> tupleVertices = (DataSet<Tuple2<K, VV>>) (DataSet<?>) vertices; - DataSet<Tuple3<K, K, EV>> tupleEdges = (DataSet<Tuple3<K, K, EV>>) (DataSet<?>) edges; - DataSet<Tuple2<K, VV>> newVertices = tupleVertices.runOperation( - VertexCentricIteration.withValuedEdges(tupleEdges, + DataSet<Vertex<K, VV>> newVertices = vertices.runOperation( + VertexCentricIteration.withEdges(edges, vertexUpdateFunction, messagingFunction, maximumNumberOfIterations)); - return new Graph<K, VV, EV>(newVertices.map(new Tuple2ToVertexMap<K, VV>()), edges, context); + return new Graph<K, VV, EV>(newVertices, edges, context); } /** http://git-wip-us.apache.org/repos/asf/flink/blob/d45c0491/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/PageRank.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/PageRank.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/PageRank.java index b7ca52b..d29a9dc 100644 --- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/PageRank.java +++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/PageRank.java @@ -1,11 +1,10 @@ package flink.graphs.library; - +import flink.graphs.Edge; import flink.graphs.Graph; import flink.graphs.GraphAlgorithm; import flink.graphs.spargel.MessageIterator; import flink.graphs.spargel.MessagingFunction; -import flink.graphs.spargel.OutgoingEdge; import flink.graphs.spargel.VertexUpdateFunction; import java.io.Serializable; @@ -69,8 +68,8 @@ public class PageRank<K extends Comparable<K> & Serializable> implements GraphAl @Override public void sendMessages(K vertexId, Double newRank) { - for (OutgoingEdge<K, Double> edge : getOutgoingEdges()) { - sendMessageTo(edge.target(), newRank * edge.edgeValue()); + for (Edge<K, Double> edge : getOutgoingEdges()) { + sendMessageTo(edge.getTarget(), newRank * edge.getValue()); } } } http://git-wip-us.apache.org/repos/asf/flink/blob/d45c0491/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/SingleSourceShortestPaths.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/SingleSourceShortestPaths.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/SingleSourceShortestPaths.java index c29909c..0da8a90 100644 --- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/SingleSourceShortestPaths.java +++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/SingleSourceShortestPaths.java @@ -3,7 +3,6 @@ package flink.graphs.library; import flink.graphs.*; import flink.graphs.spargel.MessageIterator; import flink.graphs.spargel.MessagingFunction; -import flink.graphs.spargel.OutgoingEdge; import flink.graphs.spargel.VertexUpdateFunction; import org.apache.flink.api.common.functions.MapFunction; @@ -87,8 +86,8 @@ public class SingleSourceShortestPaths<K extends Comparable<K> & Serializable> i @Override public void sendMessages(K vertexKey, Double newDistance) throws Exception { - for (OutgoingEdge<K, Double> edge : getOutgoingEdges()) { - sendMessageTo(edge.target(), newDistance + edge.edgeValue()); + for (Edge<K, Double> edge : getOutgoingEdges()) { + sendMessageTo(edge.getTarget(), newDistance + edge.getValue()); } } } http://git-wip-us.apache.org/repos/asf/flink/blob/d45c0491/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/MessagingFunction.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/MessagingFunction.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/MessagingFunction.java index 52a881e..ab451bb 100644 --- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/MessagingFunction.java +++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/MessagingFunction.java @@ -26,10 +26,11 @@ import org.apache.flink.api.common.aggregators.Aggregator; import org.apache.flink.api.common.functions.IterationRuntimeContext; import org.apache.flink.api.java.tuple.Tuple; import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.types.Value; import org.apache.flink.util.Collector; +import flink.graphs.Edge; + /** * The base class for functions that produce messages between vertices as a part of a {@link VertexCentricIteration}. * @@ -38,7 +39,8 @@ import org.apache.flink.util.Collector; * @param <Message> The type of the message sent between vertices along the edges. * @param <EdgeValue> The type of the values that are associated with the edges. */ -public abstract class MessagingFunction<VertexKey extends Comparable<VertexKey>, VertexValue, Message, EdgeValue> implements Serializable { +public abstract class MessagingFunction<VertexKey extends Comparable<VertexKey> & Serializable, + VertexValue extends Serializable, Message, EdgeValue extends Serializable> implements Serializable { private static final long serialVersionUID = 1L; @@ -79,19 +81,13 @@ public abstract class MessagingFunction<VertexKey extends Comparable<VertexKey>, * @return An iterator with all outgoing edges. */ @SuppressWarnings("unchecked") - public Iterable<OutgoingEdge<VertexKey, EdgeValue>> getOutgoingEdges() { + public Iterable<Edge<VertexKey, EdgeValue>> getOutgoingEdges() { if (edgesUsed) { throw new IllegalStateException("Can use either 'getOutgoingEdges()' or 'sendMessageToAllTargets()' exactly once."); } edgesUsed = true; - - if (this.edgeWithValueIter != null) { - this.edgeWithValueIter.set((Iterator<Tuple3<VertexKey, VertexKey, EdgeValue>>) edges); - return this.edgeWithValueIter; - } else { - this.edgeNoValueIter.set((Iterator<Tuple2<VertexKey, VertexKey>>) edges); - return this.edgeNoValueIter; - } + this.edgeIterator.set((Iterator<Edge<VertexKey, EdgeValue>>) edges); + return this.edgeIterator; } /** @@ -186,22 +182,15 @@ public abstract class MessagingFunction<VertexKey extends Comparable<VertexKey>, private Collector<Tuple2<VertexKey, Message>> out; - private EdgesIteratorNoEdgeValue<VertexKey, EdgeValue> edgeNoValueIter; - - private EdgesIteratorWithEdgeValue<VertexKey, EdgeValue> edgeWithValueIter; + private EdgesIterator<VertexKey, EdgeValue> edgeIterator; private boolean edgesUsed; - void init(IterationRuntimeContext context, boolean hasEdgeValue) { + void init(IterationRuntimeContext context) { this.runtimeContext = context; this.outValue = new Tuple2<VertexKey, Message>(); - - if (hasEdgeValue) { - this.edgeWithValueIter = new EdgesIteratorWithEdgeValue<VertexKey, EdgeValue>(); - } else { - this.edgeNoValueIter = new EdgesIteratorNoEdgeValue<VertexKey, EdgeValue>(); - } + this.edgeIterator = new EdgesIterator<VertexKey, EdgeValue>(); } void set(Iterator<?> edges, Collector<Tuple2<VertexKey, Message>> out) { @@ -210,52 +199,15 @@ public abstract class MessagingFunction<VertexKey extends Comparable<VertexKey>, this.edgesUsed = false; } - - - private static final class EdgesIteratorNoEdgeValue<VertexKey extends Comparable<VertexKey>, EdgeValue> - implements Iterator<OutgoingEdge<VertexKey, EdgeValue>>, Iterable<OutgoingEdge<VertexKey, EdgeValue>> - { - private Iterator<Tuple2<VertexKey, VertexKey>> input; - - private OutgoingEdge<VertexKey, EdgeValue> edge = new OutgoingEdge<VertexKey, EdgeValue>(); - - - void set(Iterator<Tuple2<VertexKey, VertexKey>> input) { - this.input = input; - } - - @Override - public boolean hasNext() { - return input.hasNext(); - } - - @Override - public OutgoingEdge<VertexKey, EdgeValue> next() { - Tuple2<VertexKey, VertexKey> next = input.next(); - edge.set(next.f1, null); - return edge; - } - - @Override - public void remove() { - throw new UnsupportedOperationException(); - } - - @Override - public Iterator<OutgoingEdge<VertexKey, EdgeValue>> iterator() { - return this; - } - } - - - private static final class EdgesIteratorWithEdgeValue<VertexKey extends Comparable<VertexKey>, EdgeValue> - implements Iterator<OutgoingEdge<VertexKey, EdgeValue>>, Iterable<OutgoingEdge<VertexKey, EdgeValue>> + private static final class EdgesIterator<VertexKey extends Comparable<VertexKey> & Serializable, + EdgeValue extends Serializable> + implements Iterator<Edge<VertexKey, EdgeValue>>, Iterable<Edge<VertexKey, EdgeValue>> { - private Iterator<Tuple3<VertexKey, VertexKey, EdgeValue>> input; + private Iterator<Edge<VertexKey, EdgeValue>> input; - private OutgoingEdge<VertexKey, EdgeValue> edge = new OutgoingEdge<VertexKey, EdgeValue>(); + private Edge<VertexKey, EdgeValue> edge = new Edge<VertexKey, EdgeValue>(); - void set(Iterator<Tuple3<VertexKey, VertexKey, EdgeValue>> input) { + void set(Iterator<Edge<VertexKey, EdgeValue>> input) { this.input = input; } @@ -265,9 +217,10 @@ public abstract class MessagingFunction<VertexKey extends Comparable<VertexKey>, } @Override - public OutgoingEdge<VertexKey, EdgeValue> next() { - Tuple3<VertexKey, VertexKey, EdgeValue> next = input.next(); - edge.set(next.f1, next.f2); + public Edge<VertexKey, EdgeValue> next() { + Edge<VertexKey, EdgeValue> next = input.next(); + edge.setTarget(next.f1); + edge.setValue(next.f2); return edge; } @@ -276,7 +229,7 @@ public abstract class MessagingFunction<VertexKey extends Comparable<VertexKey>, throw new UnsupportedOperationException(); } @Override - public Iterator<OutgoingEdge<VertexKey, EdgeValue>> iterator() { + public Iterator<Edge<VertexKey, EdgeValue>> iterator() { return this; } } http://git-wip-us.apache.org/repos/asf/flink/blob/d45c0491/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/OutgoingEdge.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/OutgoingEdge.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/OutgoingEdge.java deleted file mode 100644 index 7505409..0000000 --- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/OutgoingEdge.java +++ /dev/null @@ -1,64 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package flink.graphs.spargel; - -/** - * <tt>Edge</tt> objects represent edges between vertices. Edges are defined by their source and target - * vertex id. Edges may have an associated value (for example a weight or a distance), if the - * graph algorithm was initialized with the - * {@link VertexCentricIteration#withValuedEdges(org.apache.flink.api.java.DataSet, VertexUpdateFunction, MessagingFunction, int)} - * method. - * - * @param <VertexKey> The type of the vertex key. - * @param <EdgeValue> The type of the value associated with the edge. For scenarios where the edges do not hold - * value, this type may be arbitrary. - */ -public final class OutgoingEdge<VertexKey extends Comparable<VertexKey>, EdgeValue> implements java.io.Serializable { - - private static final long serialVersionUID = 1L; - - private VertexKey target; - - private EdgeValue edgeValue; - - void set(VertexKey target, EdgeValue edgeValue) { - this.target = target; - this.edgeValue = edgeValue; - } - - /** - * Gets the target vertex id. - * - * @return The target vertex id. - */ - public VertexKey target() { - return target; - } - - /** - * Gets the value associated with the edge. The value may be null if the iteration was initialized with - * an edge data set without edge values. - * Typical examples of edge values are weights or distances of the path represented by the edge. - * - * @return The value associated with the edge. - */ - public EdgeValue edgeValue() { - return edgeValue; - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/d45c0491/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexCentricIteration.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexCentricIteration.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexCentricIteration.java index b15c8c4..5f89e90 100644 --- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexCentricIteration.java +++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexCentricIteration.java @@ -18,6 +18,7 @@ package flink.graphs.spargel; +import java.io.Serializable; import java.util.ArrayList; import java.util.HashMap; import java.util.Iterator; @@ -33,13 +34,15 @@ import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.operators.CoGroupOperator; import org.apache.flink.api.java.operators.CustomUnaryOperation; import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.api.java.typeutils.ResultTypeQueryable; import org.apache.flink.api.java.typeutils.TupleTypeInfo; import org.apache.flink.api.java.typeutils.TypeExtractor; import org.apache.flink.configuration.Configuration; import org.apache.flink.util.Collector; +import flink.graphs.Edge; +import flink.graphs.Vertex; + /** * This class represents iterative graph computations, programmed in a vertex-centric perspective. * It is a special case of <i>Bulk Synchronous Parallel<i> computation. The paradigm has also been @@ -70,16 +73,15 @@ import org.apache.flink.util.Collector; * @param <Message> The type of the message sent between vertices along the edges. * @param <EdgeValue> The type of the values that are associated with the edges. */ -public class VertexCentricIteration<VertexKey extends Comparable<VertexKey>, VertexValue, Message, EdgeValue> - implements CustomUnaryOperation<Tuple2<VertexKey, VertexValue>, Tuple2<VertexKey, VertexValue>> +public class VertexCentricIteration<VertexKey extends Comparable<VertexKey> & Serializable, VertexValue extends Serializable, + Message, EdgeValue extends Serializable> + implements CustomUnaryOperation<Vertex<VertexKey, VertexValue>, Vertex<VertexKey, VertexValue>> { private final VertexUpdateFunction<VertexKey, VertexValue, Message> updateFunction; private final MessagingFunction<VertexKey, VertexValue, Message, EdgeValue> messagingFunction; - private final DataSet<Tuple2<VertexKey, VertexKey>> edgesWithoutValue; - - private final DataSet<Tuple3<VertexKey, VertexKey, EdgeValue>> edgesWithValue; + private final DataSet<Edge<VertexKey, EdgeValue>> edgesWithValue; private final Map<String, Aggregator<?>> aggregators; @@ -91,7 +93,7 @@ public class VertexCentricIteration<VertexKey extends Comparable<VertexKey>, Ver private final TypeInformation<Message> messageType; - private DataSet<Tuple2<VertexKey, VertexValue>> initialVertices; + private DataSet<Vertex<VertexKey, VertexValue>> initialVertices; private String name; @@ -101,64 +103,21 @@ public class VertexCentricIteration<VertexKey extends Comparable<VertexKey>, Ver // ---------------------------------------------------------------------------------- - private VertexCentricIteration(VertexUpdateFunction<VertexKey, VertexValue, Message> uf, - MessagingFunction<VertexKey, VertexValue, Message, EdgeValue> mf, - DataSet<Tuple2<VertexKey, VertexKey>> edgesWithoutValue, - int maximumNumberOfIterations) - { - Validate.notNull(uf); - Validate.notNull(mf); - Validate.notNull(edgesWithoutValue); - Validate.isTrue(maximumNumberOfIterations > 0, "The maximum number of iterations must be at least one."); - - // check that the edges are actually a valid tuple set of vertex key types - TypeInformation<Tuple2<VertexKey, VertexKey>> edgesType = edgesWithoutValue.getType(); - Validate.isTrue(edgesType.isTupleType() && edgesType.getArity() == 2, "The edges data set (for edges without edge values) must consist of 2-tuples."); - - TupleTypeInfo<?> tupleInfo = (TupleTypeInfo<?>) edgesType; - Validate.isTrue(tupleInfo.getTypeAt(0).equals(tupleInfo.getTypeAt(1)) - && Comparable.class.isAssignableFrom(tupleInfo.getTypeAt(0).getTypeClass()), - "Both tuple fields (source and target vertex id) must be of the data type that represents the vertex key and implement the java.lang.Comparable interface."); - - this.updateFunction = uf; - this.messagingFunction = mf; - this.edgesWithoutValue = edgesWithoutValue; - this.edgesWithValue = null; - this.maximumNumberOfIterations = maximumNumberOfIterations; - this.aggregators = new HashMap<String, Aggregator<?>>(); - - this.messageType = getMessageType(mf); - } - private VertexCentricIteration(VertexUpdateFunction<VertexKey, VertexValue, Message> uf, MessagingFunction<VertexKey, VertexValue, Message, EdgeValue> mf, - DataSet<Tuple3<VertexKey, VertexKey, EdgeValue>> edgesWithValue, - int maximumNumberOfIterations, - boolean edgeHasValueMarker) + DataSet<Edge<VertexKey, EdgeValue>> edgesWithValue, + int maximumNumberOfIterations, boolean edgeHasValueMarker) { Validate.notNull(uf); Validate.notNull(mf); Validate.notNull(edgesWithValue); Validate.isTrue(maximumNumberOfIterations > 0, "The maximum number of iterations must be at least one."); - - // check that the edges are actually a valid tuple set of vertex key types - TypeInformation<Tuple3<VertexKey, VertexKey, EdgeValue>> edgesType = edgesWithValue.getType(); - Validate.isTrue(edgesType.isTupleType() && edgesType.getArity() == 3, "The edges data set (for edges with edge values) must consist of 3-tuples."); - - TupleTypeInfo<?> tupleInfo = (TupleTypeInfo<?>) edgesType; - Validate.isTrue(tupleInfo.getTypeAt(0).equals(tupleInfo.getTypeAt(1)) - && Comparable.class.isAssignableFrom(tupleInfo.getTypeAt(0).getTypeClass()), - "The first two tuple fields (source and target vertex id) must be of the data type that represents the vertex key and implement the java.lang.Comparable interface."); - - Validate.isTrue(maximumNumberOfIterations > 0, "The maximum number of iterations must be at least one."); - + this.updateFunction = uf; this.messagingFunction = mf; - this.edgesWithoutValue = null; this.edgesWithValue = edgesWithValue; this.maximumNumberOfIterations = maximumNumberOfIterations; - this.aggregators = new HashMap<String, Aggregator<?>>(); - + this.aggregators = new HashMap<String, Aggregator<?>>(); this.messageType = getMessageType(mf); } @@ -271,20 +230,7 @@ public class VertexCentricIteration<VertexKey extends Comparable<VertexKey>, Ver * @see org.apache.flink.api.java.operators.CustomUnaryOperation#setInput(org.apache.flink.api.java.DataSet) */ @Override - public void setInput(DataSet<Tuple2<VertexKey, VertexValue>> inputData) { - // sanity check that we really have two tuples - TypeInformation<Tuple2<VertexKey, VertexValue>> inputType = inputData.getType(); - Validate.isTrue(inputType.isTupleType() && inputType.getArity() == 2, "The input data set (the initial vertices) must consist of 2-tuples."); - - // check that the key type here is the same as for the edges - TypeInformation<VertexKey> keyType = ((TupleTypeInfo<?>) inputType).getTypeAt(0); - TypeInformation<?> edgeType = edgesWithoutValue != null ? edgesWithoutValue.getType() : edgesWithValue.getType(); - TypeInformation<VertexKey> edgeKeyType = ((TupleTypeInfo<?>) edgeType).getTypeAt(0); - - Validate.isTrue(keyType.equals(edgeKeyType), "The first tuple field (the vertex id) of the input data set (the initial vertices) " + - "must be the same data type as the first fields of the edge data set (the source vertex id). " + - "Here, the key type for the vertex ids is '%s' and the key type for the edges is '%s'.", keyType, edgeKeyType); - + public void setInput(DataSet<Vertex<VertexKey, VertexValue>> inputData) { this.initialVertices = inputData; } @@ -294,22 +240,22 @@ public class VertexCentricIteration<VertexKey extends Comparable<VertexKey>, Ver * @return The operator that represents this vertex-centric graph computation. */ @Override - public DataSet<Tuple2<VertexKey, VertexValue>> createResult() { + public DataSet<Vertex<VertexKey, VertexValue>> createResult() { if (this.initialVertices == null) { throw new IllegalStateException("The input data set has not been set."); } // prepare some type information - TypeInformation<Tuple2<VertexKey, VertexValue>> vertexTypes = initialVertices.getType(); + TypeInformation<Vertex<VertexKey, VertexValue>> vertexTypes = initialVertices.getType(); TypeInformation<VertexKey> keyType = ((TupleTypeInfo<?>) initialVertices.getType()).getTypeAt(0); - TypeInformation<Tuple2<VertexKey, Message>> messageTypeInfo = new TupleTypeInfo<Tuple2<VertexKey,Message>>(keyType, messageType); - + TypeInformation<Tuple2<VertexKey, Message>> messageTypeInfo = new TupleTypeInfo<Tuple2<VertexKey,Message>>(keyType, messageType); + // set up the iteration operator final String name = (this.name != null) ? this.name : "Vertex-centric iteration (" + updateFunction + " | " + messagingFunction + ")"; final int[] zeroKeyPos = new int[] {0}; - final DeltaIteration<Tuple2<VertexKey, VertexValue>, Tuple2<VertexKey, VertexValue>> iteration = + final DeltaIteration<Vertex<VertexKey, VertexValue>, Vertex<VertexKey, VertexValue>> iteration = this.initialVertices.iterateDelta(this.initialVertices, this.maximumNumberOfIterations, zeroKeyPos); iteration.name(name); iteration.parallelism(parallelism); @@ -322,14 +268,8 @@ public class VertexCentricIteration<VertexKey extends Comparable<VertexKey>, Ver // build the messaging function (co group) CoGroupOperator<?, ?, Tuple2<VertexKey, Message>> messages; - if (edgesWithoutValue != null) { - MessagingUdfNoEdgeValues<VertexKey, VertexValue, Message> messenger = new MessagingUdfNoEdgeValues<VertexKey, VertexValue, Message>(messagingFunction, messageTypeInfo); - messages = this.edgesWithoutValue.coGroup(iteration.getWorkset()).where(0).equalTo(0).with(messenger); - } - else { - MessagingUdfWithEdgeValues<VertexKey, VertexValue, Message, EdgeValue> messenger = new MessagingUdfWithEdgeValues<VertexKey, VertexValue, Message, EdgeValue>(messagingFunction, messageTypeInfo); - messages = this.edgesWithValue.coGroup(iteration.getWorkset()).where(0).equalTo(0).with(messenger); - } + MessagingUdfWithEdgeValues<VertexKey, VertexValue, Message, EdgeValue> messenger = new MessagingUdfWithEdgeValues<VertexKey, VertexValue, Message, EdgeValue>(messagingFunction, messageTypeInfo); + messages = this.edgesWithValue.coGroup(iteration.getWorkset()).where(0).equalTo(0).with(messenger); // configure coGroup message function with name and broadcast variables messages = messages.name("Messaging"); @@ -340,7 +280,7 @@ public class VertexCentricIteration<VertexKey extends Comparable<VertexKey>, Ver VertexUpdateUdf<VertexKey, VertexValue, Message> updateUdf = new VertexUpdateUdf<VertexKey, VertexValue, Message>(updateFunction, vertexTypes); // build the update function (co group) - CoGroupOperator<?, ?, Tuple2<VertexKey, VertexValue>> updates = + CoGroupOperator<?, ?, Vertex<VertexKey, VertexValue>> updates = messages.coGroup(iteration.getSolutionSet()).where(0).equalTo(0).with(updateUdf); // configure coGroup update function with name and broadcast variables @@ -355,43 +295,12 @@ public class VertexCentricIteration<VertexKey extends Comparable<VertexKey>, Ver return iteration.closeWith(updates, updates); } - - // -------------------------------------------------------------------------------------------- - // Constructor builders to avoid signature conflicts with generic type erasure - // -------------------------------------------------------------------------------------------- - - /** - * Creates a new vertex-centric iteration operator for graphs where the edges are not associated with a value. - * - * @param edgesWithoutValue The data set containing edges. Edges are represented as 2-tuples: (source-id, target-id) - * @param vertexUpdateFunction The function that updates the state of the vertices from the incoming messages. - * @param messagingFunction The function that turns changed vertex states into messages along the edges. - * - * @param <VertexKey> The type of the vertex key (the vertex identifier). - * @param <VertexValue> The type of the vertex value (the state of the vertex). - * @param <Message> The type of the message sent between vertices along the edges. - * - * @return An in stance of the vertex-centric graph computation operator. - */ - public static final <VertexKey extends Comparable<VertexKey>, VertexValue, Message> - VertexCentricIteration<VertexKey, VertexValue, Message, ?> withPlainEdges( - DataSet<Tuple2<VertexKey, VertexKey>> edgesWithoutValue, - VertexUpdateFunction<VertexKey, VertexValue, Message> vertexUpdateFunction, - MessagingFunction<VertexKey, VertexValue, Message, ?> messagingFunction, - int maximumNumberOfIterations) - { - @SuppressWarnings("unchecked") - MessagingFunction<VertexKey, VertexValue, Message, Object> tmf = - (MessagingFunction<VertexKey, VertexValue, Message, Object>) messagingFunction; - - return new VertexCentricIteration<VertexKey, VertexValue, Message, Object>(vertexUpdateFunction, tmf, edgesWithoutValue, maximumNumberOfIterations); - } - + /** * Creates a new vertex-centric iteration operator for graphs where the edges are associated with a value (such as * a weight or distance). * - * @param edgesWithValue The data set containing edges. Edges are represented as 2-tuples: (source-id, target-id) + * @param edgesWithValue The data set containing edges. * @param uf The function that updates the state of the vertices from the incoming messages. * @param mf The function that turns changed vertex states into messages along the edges. * @@ -402,9 +311,10 @@ public class VertexCentricIteration<VertexKey extends Comparable<VertexKey>, Ver * * @return An in stance of the vertex-centric graph computation operator. */ - public static final <VertexKey extends Comparable<VertexKey>, VertexValue, Message, EdgeValue> - VertexCentricIteration<VertexKey, VertexValue, Message, EdgeValue> withValuedEdges( - DataSet<Tuple3<VertexKey, VertexKey, EdgeValue>> edgesWithValue, + public static final <VertexKey extends Comparable<VertexKey> & Serializable, VertexValue extends Serializable, + Message, EdgeValue extends Serializable> + VertexCentricIteration<VertexKey, VertexValue, Message, EdgeValue> withEdges( + DataSet<Edge<VertexKey, EdgeValue>> edgesWithValue, VertexUpdateFunction<VertexKey, VertexValue, Message> uf, MessagingFunction<VertexKey, VertexValue, Message, EdgeValue> mf, int maximumNumberOfIterations) @@ -416,9 +326,10 @@ public class VertexCentricIteration<VertexKey extends Comparable<VertexKey>, Ver // Wrapping UDFs // -------------------------------------------------------------------------------------------- - private static final class VertexUpdateUdf<VertexKey extends Comparable<VertexKey>, VertexValue, Message> - extends RichCoGroupFunction<Tuple2<VertexKey, Message>, Tuple2<VertexKey, VertexValue>, Tuple2<VertexKey, VertexValue>> - implements ResultTypeQueryable<Tuple2<VertexKey, VertexValue>> + private static final class VertexUpdateUdf<VertexKey extends Comparable<VertexKey> & Serializable, + VertexValue extends Serializable, Message> + extends RichCoGroupFunction<Tuple2<VertexKey, Message>, Vertex<VertexKey, VertexValue>, Vertex<VertexKey, VertexValue>> + implements ResultTypeQueryable<Vertex<VertexKey, VertexValue>> { private static final long serialVersionUID = 1L; @@ -426,25 +337,25 @@ public class VertexCentricIteration<VertexKey extends Comparable<VertexKey>, Ver private final MessageIterator<Message> messageIter = new MessageIterator<Message>(); - private transient TypeInformation<Tuple2<VertexKey, VertexValue>> resultType; + private transient TypeInformation<Vertex<VertexKey, VertexValue>> resultType; private VertexUpdateUdf(VertexUpdateFunction<VertexKey, VertexValue, Message> vertexUpdateFunction, - TypeInformation<Tuple2<VertexKey, VertexValue>> resultType) + TypeInformation<Vertex<VertexKey, VertexValue>> resultType) { this.vertexUpdateFunction = vertexUpdateFunction; this.resultType = resultType; } @Override - public void coGroup(Iterable<Tuple2<VertexKey, Message>> messages, Iterable<Tuple2<VertexKey, VertexValue>> vertex, - Collector<Tuple2<VertexKey, VertexValue>> out) + public void coGroup(Iterable<Tuple2<VertexKey, Message>> messages, Iterable<Vertex<VertexKey, VertexValue>> vertex, + Collector<Vertex<VertexKey, VertexValue>> out) throws Exception { - final Iterator<Tuple2<VertexKey, VertexValue>> vertexIter = vertex.iterator(); + final Iterator<Vertex<VertexKey, VertexValue>> vertexIter = vertex.iterator(); if (vertexIter.hasNext()) { - Tuple2<VertexKey, VertexValue> vertexState = vertexIter.next(); + Vertex<VertexKey, VertexValue> vertexState = vertexIter.next(); @SuppressWarnings("unchecked") Iterator<Tuple2<?, Message>> downcastIter = (Iterator<Tuple2<?, Message>>) (Iterator<?>) messages.iterator(); @@ -482,71 +393,17 @@ public class VertexCentricIteration<VertexKey extends Comparable<VertexKey>, Ver } @Override - public TypeInformation<Tuple2<VertexKey, VertexValue>> getProducedType() { + public TypeInformation<Vertex<VertexKey, VertexValue>> getProducedType() { return this.resultType; } } - - /* - * UDF that encapsulates the message sending function for graphs where the edges have no associated values. - */ - private static final class MessagingUdfNoEdgeValues<VertexKey extends Comparable<VertexKey>, VertexValue, Message> - extends RichCoGroupFunction<Tuple2<VertexKey, VertexKey>, Tuple2<VertexKey, VertexValue>, Tuple2<VertexKey, Message>> - implements ResultTypeQueryable<Tuple2<VertexKey, Message>> - { - private static final long serialVersionUID = 1L; - - private final MessagingFunction<VertexKey, VertexValue, Message, ?> messagingFunction; - - private transient TypeInformation<Tuple2<VertexKey, Message>> resultType; - - - private MessagingUdfNoEdgeValues(MessagingFunction<VertexKey, VertexValue, Message, ?> messagingFunction, - TypeInformation<Tuple2<VertexKey, Message>> resultType) - { - this.messagingFunction = messagingFunction; - this.resultType = resultType; - } - - @Override - public void coGroup(Iterable<Tuple2<VertexKey, VertexKey>> edges, - Iterable<Tuple2<VertexKey, VertexValue>> state, Collector<Tuple2<VertexKey, Message>> out) - throws Exception - { - final Iterator<Tuple2<VertexKey, VertexValue>> stateIter = state.iterator(); - - if (stateIter.hasNext()) { - Tuple2<VertexKey, VertexValue> newVertexState = stateIter.next(); - messagingFunction.set((Iterator<?>) edges.iterator(), out); - messagingFunction.sendMessages(newVertexState.f0, newVertexState.f1); - } - } - - @Override - public void open(Configuration parameters) throws Exception { - if (getIterationRuntimeContext().getSuperstepNumber() == 1) { - this.messagingFunction.init(getIterationRuntimeContext(), false); - } - - this.messagingFunction.preSuperstep(); - } - - @Override - public void close() throws Exception { - this.messagingFunction.postSuperstep(); - } - @Override - public TypeInformation<Tuple2<VertexKey, Message>> getProducedType() { - return this.resultType; - } - } - /* * UDF that encapsulates the message sending function for graphs where the edges have an associated value. */ - private static final class MessagingUdfWithEdgeValues<VertexKey extends Comparable<VertexKey>, VertexValue, Message, EdgeValue> - extends RichCoGroupFunction<Tuple3<VertexKey, VertexKey, EdgeValue>, Tuple2<VertexKey, VertexValue>, Tuple2<VertexKey, Message>> + private static final class MessagingUdfWithEdgeValues<VertexKey extends Comparable<VertexKey> & Serializable, + VertexValue extends Serializable, Message, EdgeValue extends Serializable> + extends RichCoGroupFunction<Edge<VertexKey, EdgeValue>, Vertex<VertexKey, VertexValue>, Tuple2<VertexKey, Message>> implements ResultTypeQueryable<Tuple2<VertexKey, Message>> { private static final long serialVersionUID = 1L; @@ -564,14 +421,14 @@ public class VertexCentricIteration<VertexKey extends Comparable<VertexKey>, Ver } @Override - public void coGroup(Iterable<Tuple3<VertexKey, VertexKey, EdgeValue>> edges, - Iterable<Tuple2<VertexKey, VertexValue>> state, Collector<Tuple2<VertexKey, Message>> out) + public void coGroup(Iterable<Edge<VertexKey, EdgeValue>> edges, + Iterable<Vertex<VertexKey, VertexValue>> state, Collector<Tuple2<VertexKey, Message>> out) throws Exception { - final Iterator<Tuple2<VertexKey, VertexValue>> stateIter = state.iterator(); + final Iterator<Vertex<VertexKey, VertexValue>> stateIter = state.iterator(); if (stateIter.hasNext()) { - Tuple2<VertexKey, VertexValue> newVertexState = stateIter.next(); + Vertex<VertexKey, VertexValue> newVertexState = stateIter.next(); messagingFunction.set((Iterator<?>) edges.iterator(), out); messagingFunction.sendMessages(newVertexState.f0, newVertexState.f1); } @@ -580,7 +437,7 @@ public class VertexCentricIteration<VertexKey extends Comparable<VertexKey>, Ver @Override public void open(Configuration parameters) throws Exception { if (getIterationRuntimeContext().getSuperstepNumber() == 1) { - this.messagingFunction.init(getIterationRuntimeContext(), true); + this.messagingFunction.init(getIterationRuntimeContext()); } this.messagingFunction.preSuperstep(); http://git-wip-us.apache.org/repos/asf/flink/blob/d45c0491/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexUpdateFunction.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexUpdateFunction.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexUpdateFunction.java index c3fd2b1..e30451c 100644 --- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexUpdateFunction.java +++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexUpdateFunction.java @@ -23,10 +23,11 @@ import java.util.Collection; import org.apache.flink.api.common.aggregators.Aggregator; import org.apache.flink.api.common.functions.IterationRuntimeContext; -import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.types.Value; import org.apache.flink.util.Collector; +import flink.graphs.Vertex; + /** * This class must be extended by functions that compute the state of the vertex depending on the old state and the * incoming messages. The central method is {@link #updateVertex(Comparable, Object, MessageIterator)}, which is @@ -36,7 +37,8 @@ import org.apache.flink.util.Collector; * <VertexValue> The vertex value type. * <Message> The message type. */ -public abstract class VertexUpdateFunction<VertexKey extends Comparable<VertexKey>, VertexValue, Message> implements Serializable { +public abstract class VertexUpdateFunction<VertexKey extends Comparable<VertexKey> & Serializable, + VertexValue extends Serializable, Message> implements Serializable { private static final long serialVersionUID = 1L; @@ -129,16 +131,16 @@ public abstract class VertexUpdateFunction<VertexKey extends Comparable<VertexKe private IterationRuntimeContext runtimeContext; - private Collector<Tuple2<VertexKey, VertexValue>> out; + private Collector<Vertex<VertexKey, VertexValue>> out; - private Tuple2<VertexKey, VertexValue> outVal; + private Vertex<VertexKey, VertexValue> outVal; void init(IterationRuntimeContext context) { this.runtimeContext = context; } - void setOutput(Tuple2<VertexKey, VertexValue> val, Collector<Tuple2<VertexKey, VertexValue>> out) { + void setOutput(Vertex<VertexKey, VertexValue> val, Collector<Vertex<VertexKey, VertexValue>> out) { this.out = out; this.outVal = val; }