[FLINK-1201] [gelly] get rid of Vertex to Tuple2 and Edge to Tuple3 conversions in runVertexCentricIteration
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/935ad5a7 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/935ad5a7 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/935ad5a7 Branch: refs/heads/master Commit: 935ad5a7a223c350b6d299ccee0a87cd1fa451de Parents: b0e17e3 Author: vasia <vasilikikala...@gmail.com> Authored: Sun Jan 4 19:19:23 2015 +0100 Committer: Stephan Ewen <se...@apache.org> Committed: Wed Feb 11 10:46:13 2015 +0100 ---------------------------------------------------------------------- .../src/main/java/org/apache/flink/graph/Graph.java | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/935ad5a7/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java index ecffbcc..d54f554 100644 --- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java +++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java @@ -52,10 +52,8 @@ import org.apache.flink.spargel.java.VertexUpdateFunction; import org.apache.flink.util.Collector; import org.apache.flink.types.NullValue; -import flink.graphs.utils.EdgeToTuple3Map; import flink.graphs.utils.GraphUtils; import flink.graphs.utils.Tuple2ToVertexMap; -import flink.graphs.utils.VertexToTuple2Map; import flink.graphs.validation.GraphValidator; @SuppressWarnings("serial") @@ -958,10 +956,13 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab * @param maximumNumberOfIterations * @return */ - public <M>Graph<K, VV, EV> runVertexCentricIteration(VertexUpdateFunction<K, VV, M> vertexUpdateFunction, + @SuppressWarnings("unchecked") + public <M>Graph<K, VV, EV> runVertexCentricIteration(VertexUpdateFunction<K, VV, M> vertexUpdateFunction, MessagingFunction<K, VV, M, EV> messagingFunction, int maximumNumberOfIterations) { - DataSet<Tuple2<K, VV>> newVertices = vertices.map(new VertexToTuple2Map<K, VV>()).runOperation( - VertexCentricIteration.withValuedEdges(edges.map(new EdgeToTuple3Map<K, EV>()), + DataSet<Tuple2<K, VV>> tupleVertices = (DataSet<Tuple2<K, VV>>) (DataSet<?>) vertices; + DataSet<Tuple3<K, K, EV>> tupleEdges = (DataSet<Tuple3<K, K, EV>>) (DataSet<?>) edges; + DataSet<Tuple2<K, VV>> newVertices = tupleVertices.runOperation( + VertexCentricIteration.withValuedEdges(tupleEdges, vertexUpdateFunction, messagingFunction, maximumNumberOfIterations)); return new Graph<K, VV, EV>(newVertices.map(new Tuple2ToVertexMap<K, VV>()), edges, context); }