[FLINK-1201] [gelly] changed spargel classes to work with Vertex and Edge types


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d45c0491
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d45c0491
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d45c0491

Branch: refs/heads/master
Commit: d45c049167ae30ae42fc63a46d9da29df2369652
Parents: b0b1295
Author: vasia <vasilikikala...@gmail.com>
Authored: Thu Jan 8 20:29:14 2015 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Feb 11 10:46:14 2015 +0100

----------------------------------------------------------------------
 .../main/java/org/apache/flink/graph/Graph.java |  10 +-
 .../apache/flink/graph/library/PageRank.java    |   7 +-
 .../library/SingleSourceShortestPaths.java      |   5 +-
 .../flink/graph/spargel/MessagingFunction.java  |  89 ++-----
 .../flink/graph/spargel/OutgoingEdge.java       |  64 -----
 .../graph/spargel/VertexCentricIteration.java   | 237 ++++---------------
 .../graph/spargel/VertexUpdateFunction.java     |  12 +-
 7 files changed, 83 insertions(+), 341 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/d45c0491/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java 
b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
index 247f5fc..425a377 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
@@ -53,7 +53,6 @@ import flink.graphs.spargel.MessagingFunction;
 import flink.graphs.spargel.VertexCentricIteration;
 import flink.graphs.spargel.VertexUpdateFunction;
 import flink.graphs.utils.GraphUtils;
-import flink.graphs.utils.Tuple2ToVertexMap;
 import flink.graphs.validation.GraphValidator;
 
 /**
@@ -1058,15 +1057,12 @@ public class Graph<K extends Comparable<K> & 
Serializable, VV extends Serializab
         * @param maximumNumberOfIterations maximum number of iterations to 
perform
         * @return
         */
-       @SuppressWarnings("unchecked")
        public <M>Graph<K, VV, EV> 
runVertexCentricIteration(VertexUpdateFunction<K, VV, M> vertexUpdateFunction,
                MessagingFunction<K, VV, M, EV> messagingFunction, int 
maximumNumberOfIterations) {
-       DataSet<Tuple2<K, VV>> tupleVertices = (DataSet<Tuple2<K, VV>>) 
(DataSet<?>) vertices;
-       DataSet<Tuple3<K, K, EV>> tupleEdges = (DataSet<Tuple3<K, K, EV>>) 
(DataSet<?>) edges;
-       DataSet<Tuple2<K, VV>> newVertices = tupleVertices.runOperation(
-                            VertexCentricIteration.withValuedEdges(tupleEdges,
+       DataSet<Vertex<K, VV>> newVertices = vertices.runOperation(
+                            VertexCentricIteration.withEdges(edges,
                                                vertexUpdateFunction, 
messagingFunction, maximumNumberOfIterations));
-               return new Graph<K, VV, EV>(newVertices.map(new 
Tuple2ToVertexMap<K, VV>()), edges, context);
+               return new Graph<K, VV, EV>(newVertices, edges, context);
     }
 
        /**

http://git-wip-us.apache.org/repos/asf/flink/blob/d45c0491/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/PageRank.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/PageRank.java
 
b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/PageRank.java
index b7ca52b..d29a9dc 100644
--- 
a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/PageRank.java
+++ 
b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/PageRank.java
@@ -1,11 +1,10 @@
 package flink.graphs.library;
 
-
+import flink.graphs.Edge;
 import flink.graphs.Graph;
 import flink.graphs.GraphAlgorithm;
 import flink.graphs.spargel.MessageIterator;
 import flink.graphs.spargel.MessagingFunction;
-import flink.graphs.spargel.OutgoingEdge;
 import flink.graphs.spargel.VertexUpdateFunction;
 
 import java.io.Serializable;
@@ -69,8 +68,8 @@ public class PageRank<K extends Comparable<K> & Serializable> 
implements GraphAl
 
         @Override
         public void sendMessages(K vertexId, Double newRank) {
-            for (OutgoingEdge<K, Double> edge : getOutgoingEdges()) {
-                sendMessageTo(edge.target(), newRank * edge.edgeValue());
+            for (Edge<K, Double> edge : getOutgoingEdges()) {
+                sendMessageTo(edge.getTarget(), newRank * edge.getValue());
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/flink/blob/d45c0491/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/SingleSourceShortestPaths.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/SingleSourceShortestPaths.java
 
b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/SingleSourceShortestPaths.java
index c29909c..0da8a90 100644
--- 
a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/SingleSourceShortestPaths.java
+++ 
b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/SingleSourceShortestPaths.java
@@ -3,7 +3,6 @@ package flink.graphs.library;
 import flink.graphs.*;
 import flink.graphs.spargel.MessageIterator;
 import flink.graphs.spargel.MessagingFunction;
-import flink.graphs.spargel.OutgoingEdge;
 import flink.graphs.spargel.VertexUpdateFunction;
 
 import org.apache.flink.api.common.functions.MapFunction;
@@ -87,8 +86,8 @@ public class SingleSourceShortestPaths<K extends 
Comparable<K> & Serializable> i
 
         @Override
         public void sendMessages(K vertexKey, Double newDistance) throws 
Exception {
-            for (OutgoingEdge<K, Double> edge : getOutgoingEdges()) {
-                sendMessageTo(edge.target(), newDistance + edge.edgeValue());
+            for (Edge<K, Double> edge : getOutgoingEdges()) {
+                sendMessageTo(edge.getTarget(), newDistance + edge.getValue());
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/flink/blob/d45c0491/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/MessagingFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/MessagingFunction.java
 
b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/MessagingFunction.java
index 52a881e..ab451bb 100644
--- 
a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/MessagingFunction.java
+++ 
b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/MessagingFunction.java
@@ -26,10 +26,11 @@ import org.apache.flink.api.common.aggregators.Aggregator;
 import org.apache.flink.api.common.functions.IterationRuntimeContext;
 import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.types.Value;
 import org.apache.flink.util.Collector;
 
+import flink.graphs.Edge;
+
 /**
  * The base class for functions that produce messages between vertices as a 
part of a {@link VertexCentricIteration}.
  * 
@@ -38,7 +39,8 @@ import org.apache.flink.util.Collector;
  * @param <Message> The type of the message sent between vertices along the 
edges.
  * @param <EdgeValue> The type of the values that are associated with the 
edges.
  */
-public abstract class MessagingFunction<VertexKey extends 
Comparable<VertexKey>, VertexValue, Message, EdgeValue> implements Serializable 
{
+public abstract class MessagingFunction<VertexKey extends 
Comparable<VertexKey> & Serializable, 
+       VertexValue extends Serializable, Message, EdgeValue extends 
Serializable> implements Serializable {
 
        private static final long serialVersionUID = 1L;
        
@@ -79,19 +81,13 @@ public abstract class MessagingFunction<VertexKey extends 
Comparable<VertexKey>,
         * @return An iterator with all outgoing edges.
         */
        @SuppressWarnings("unchecked")
-       public Iterable<OutgoingEdge<VertexKey, EdgeValue>> getOutgoingEdges() {
+       public Iterable<Edge<VertexKey, EdgeValue>> getOutgoingEdges() {
                if (edgesUsed) {
                        throw new IllegalStateException("Can use either 
'getOutgoingEdges()' or 'sendMessageToAllTargets()' exactly once.");
                }
                edgesUsed = true;
-               
-               if (this.edgeWithValueIter != null) {
-                       this.edgeWithValueIter.set((Iterator<Tuple3<VertexKey, 
VertexKey, EdgeValue>>) edges);
-                       return this.edgeWithValueIter;
-               } else {
-                       this.edgeNoValueIter.set((Iterator<Tuple2<VertexKey, 
VertexKey>>) edges);
-                       return this.edgeNoValueIter;
-               }
+               this.edgeIterator.set((Iterator<Edge<VertexKey, EdgeValue>>) 
edges);
+               return this.edgeIterator;
        }
        
        /**
@@ -186,22 +182,15 @@ public abstract class MessagingFunction<VertexKey extends 
Comparable<VertexKey>,
        
        private Collector<Tuple2<VertexKey, Message>> out;
        
-       private EdgesIteratorNoEdgeValue<VertexKey, EdgeValue> edgeNoValueIter;
-       
-       private EdgesIteratorWithEdgeValue<VertexKey, EdgeValue> 
edgeWithValueIter;
+       private EdgesIterator<VertexKey, EdgeValue> edgeIterator;
        
        private boolean edgesUsed;
        
        
-       void init(IterationRuntimeContext context, boolean hasEdgeValue) {
+       void init(IterationRuntimeContext context) {
                this.runtimeContext = context;
                this.outValue = new Tuple2<VertexKey, Message>();
-               
-               if (hasEdgeValue) {
-                       this.edgeWithValueIter = new 
EdgesIteratorWithEdgeValue<VertexKey, EdgeValue>();
-               } else {
-                       this.edgeNoValueIter = new 
EdgesIteratorNoEdgeValue<VertexKey, EdgeValue>();
-               }
+               this.edgeIterator = new EdgesIterator<VertexKey, EdgeValue>();
        }
        
        void set(Iterator<?> edges, Collector<Tuple2<VertexKey, Message>> out) {
@@ -210,52 +199,15 @@ public abstract class MessagingFunction<VertexKey extends 
Comparable<VertexKey>,
                this.edgesUsed = false;
        }
        
-       
-       
-       private static final class EdgesIteratorNoEdgeValue<VertexKey extends 
Comparable<VertexKey>, EdgeValue> 
-               implements Iterator<OutgoingEdge<VertexKey, EdgeValue>>, 
Iterable<OutgoingEdge<VertexKey, EdgeValue>>
-       {
-               private Iterator<Tuple2<VertexKey, VertexKey>> input;
-               
-               private OutgoingEdge<VertexKey, EdgeValue> edge = new 
OutgoingEdge<VertexKey, EdgeValue>();
-               
-               
-               void set(Iterator<Tuple2<VertexKey, VertexKey>> input) {
-                       this.input = input;
-               }
-               
-               @Override
-               public boolean hasNext() {
-                       return input.hasNext();
-               }
-
-               @Override
-               public OutgoingEdge<VertexKey, EdgeValue> next() {
-                       Tuple2<VertexKey, VertexKey> next = input.next();
-                       edge.set(next.f1, null);
-                       return edge;
-               }
-
-               @Override
-               public void remove() {
-                       throw new UnsupportedOperationException();
-               }
-
-               @Override
-               public Iterator<OutgoingEdge<VertexKey, EdgeValue>> iterator() {
-                       return this;
-               }
-       }
-       
-       
-       private static final class EdgesIteratorWithEdgeValue<VertexKey extends 
Comparable<VertexKey>, EdgeValue> 
-               implements Iterator<OutgoingEdge<VertexKey, EdgeValue>>, 
Iterable<OutgoingEdge<VertexKey, EdgeValue>>
+       private static final class EdgesIterator<VertexKey extends 
Comparable<VertexKey> & Serializable, 
+               EdgeValue extends Serializable> 
+               implements Iterator<Edge<VertexKey, EdgeValue>>, 
Iterable<Edge<VertexKey, EdgeValue>>
        {
-               private Iterator<Tuple3<VertexKey, VertexKey, EdgeValue>> input;
+               private Iterator<Edge<VertexKey, EdgeValue>> input;
                
-               private OutgoingEdge<VertexKey, EdgeValue> edge = new 
OutgoingEdge<VertexKey, EdgeValue>();
+               private Edge<VertexKey, EdgeValue> edge = new Edge<VertexKey, 
EdgeValue>();
                
-               void set(Iterator<Tuple3<VertexKey, VertexKey, EdgeValue>> 
input) {
+               void set(Iterator<Edge<VertexKey, EdgeValue>> input) {
                        this.input = input;
                }
                
@@ -265,9 +217,10 @@ public abstract class MessagingFunction<VertexKey extends 
Comparable<VertexKey>,
                }
 
                @Override
-               public OutgoingEdge<VertexKey, EdgeValue> next() {
-                       Tuple3<VertexKey, VertexKey, EdgeValue> next = 
input.next();
-                       edge.set(next.f1, next.f2);
+               public Edge<VertexKey, EdgeValue> next() {
+                       Edge<VertexKey, EdgeValue> next = input.next();
+                       edge.setTarget(next.f1);
+                       edge.setValue(next.f2);
                        return edge;
                }
 
@@ -276,7 +229,7 @@ public abstract class MessagingFunction<VertexKey extends 
Comparable<VertexKey>,
                        throw new UnsupportedOperationException();
                }
                @Override
-               public Iterator<OutgoingEdge<VertexKey, EdgeValue>> iterator() {
+               public Iterator<Edge<VertexKey, EdgeValue>> iterator() {
                        return this;
                }
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/d45c0491/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/OutgoingEdge.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/OutgoingEdge.java
 
b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/OutgoingEdge.java
deleted file mode 100644
index 7505409..0000000
--- 
a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/OutgoingEdge.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package flink.graphs.spargel;
-
-/**
- * <tt>Edge</tt> objects represent edges between vertices. Edges are defined 
by their source and target
- * vertex id. Edges may have an associated value (for example a weight or a 
distance), if the
- * graph algorithm was initialized with the
- * {@link 
VertexCentricIteration#withValuedEdges(org.apache.flink.api.java.DataSet, 
VertexUpdateFunction, MessagingFunction, int)}
- * method.
- *
- * @param <VertexKey> The type of the vertex key.
- * @param <EdgeValue> The type of the value associated with the edge. For 
scenarios where the edges do not hold
- *                    value, this type may be arbitrary.
- */
-public final class OutgoingEdge<VertexKey extends Comparable<VertexKey>, 
EdgeValue> implements java.io.Serializable {
-       
-       private static final long serialVersionUID = 1L;
-       
-       private VertexKey target;
-       
-       private EdgeValue edgeValue;
-       
-       void set(VertexKey target, EdgeValue edgeValue) {
-               this.target = target;
-               this.edgeValue = edgeValue;
-       }
-       
-       /**
-        * Gets the target vertex id.
-        * 
-        * @return The target vertex id.
-        */
-       public VertexKey target() {
-               return target;
-       }
-       
-       /**
-        * Gets the value associated with the edge. The value may be null if 
the iteration was initialized with
-        * an edge data set without edge values.
-        * Typical examples of edge values are weights or distances of the path 
represented by the edge.
-        *  
-        * @return The value associated with the edge.
-        */
-       public EdgeValue edgeValue() {
-               return edgeValue;
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/d45c0491/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexCentricIteration.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexCentricIteration.java
 
b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexCentricIteration.java
index b15c8c4..5f89e90 100644
--- 
a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexCentricIteration.java
+++ 
b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexCentricIteration.java
@@ -18,6 +18,7 @@
 
 package flink.graphs.spargel;
 
+import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.Iterator;
@@ -33,13 +34,15 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.operators.CoGroupOperator;
 import org.apache.flink.api.java.operators.CustomUnaryOperation;
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.util.Collector;
 
+import flink.graphs.Edge;
+import flink.graphs.Vertex;
+
 /**
  * This class represents iterative graph computations, programmed in a 
vertex-centric perspective.
  * It is a special case of <i>Bulk Synchronous Parallel<i> computation. The 
paradigm has also been
@@ -70,16 +73,15 @@ import org.apache.flink.util.Collector;
  * @param <Message> The type of the message sent between vertices along the 
edges.
  * @param <EdgeValue> The type of the values that are associated with the 
edges.
  */
-public class VertexCentricIteration<VertexKey extends Comparable<VertexKey>, 
VertexValue, Message, EdgeValue> 
-       implements CustomUnaryOperation<Tuple2<VertexKey, VertexValue>, 
Tuple2<VertexKey, VertexValue>>
+public class VertexCentricIteration<VertexKey extends Comparable<VertexKey> & 
Serializable, VertexValue extends Serializable, 
+       Message, EdgeValue extends Serializable> 
+       implements CustomUnaryOperation<Vertex<VertexKey, VertexValue>, 
Vertex<VertexKey, VertexValue>>
 {
        private final VertexUpdateFunction<VertexKey, VertexValue, Message> 
updateFunction;
        
        private final MessagingFunction<VertexKey, VertexValue, Message, 
EdgeValue> messagingFunction;
        
-       private final DataSet<Tuple2<VertexKey, VertexKey>> edgesWithoutValue;
-       
-       private final DataSet<Tuple3<VertexKey, VertexKey, EdgeValue>> 
edgesWithValue;
+       private final DataSet<Edge<VertexKey, EdgeValue>> edgesWithValue;
        
        private final Map<String, Aggregator<?>> aggregators;
        
@@ -91,7 +93,7 @@ public class VertexCentricIteration<VertexKey extends 
Comparable<VertexKey>, Ver
        
        private final TypeInformation<Message> messageType;
        
-       private DataSet<Tuple2<VertexKey, VertexValue>> initialVertices;
+       private DataSet<Vertex<VertexKey, VertexValue>> initialVertices;
        
        private String name;
        
@@ -101,64 +103,21 @@ public class VertexCentricIteration<VertexKey extends 
Comparable<VertexKey>, Ver
        
        // 
----------------------------------------------------------------------------------
        
-       private  VertexCentricIteration(VertexUpdateFunction<VertexKey, 
VertexValue, Message> uf,
-                       MessagingFunction<VertexKey, VertexValue, Message, 
EdgeValue> mf,
-                       DataSet<Tuple2<VertexKey, VertexKey>> edgesWithoutValue,
-                       int maximumNumberOfIterations)
-       {
-               Validate.notNull(uf);
-               Validate.notNull(mf);
-               Validate.notNull(edgesWithoutValue);
-               Validate.isTrue(maximumNumberOfIterations > 0, "The maximum 
number of iterations must be at least one.");
-               
-               // check that the edges are actually a valid tuple set of 
vertex key types
-               TypeInformation<Tuple2<VertexKey, VertexKey>> edgesType = 
edgesWithoutValue.getType();
-               Validate.isTrue(edgesType.isTupleType() && edgesType.getArity() 
== 2, "The edges data set (for edges without edge values) must consist of 
2-tuples.");
-               
-               TupleTypeInfo<?> tupleInfo = (TupleTypeInfo<?>) edgesType;
-               
Validate.isTrue(tupleInfo.getTypeAt(0).equals(tupleInfo.getTypeAt(1))
-                       && 
Comparable.class.isAssignableFrom(tupleInfo.getTypeAt(0).getTypeClass()),
-                       "Both tuple fields (source and target vertex id) must 
be of the data type that represents the vertex key and implement the 
java.lang.Comparable interface.");
-               
-               this.updateFunction = uf;
-               this.messagingFunction = mf;
-               this.edgesWithoutValue = edgesWithoutValue;
-               this.edgesWithValue = null;
-               this.maximumNumberOfIterations = maximumNumberOfIterations;
-               this.aggregators = new HashMap<String, Aggregator<?>>();
-               
-               this.messageType = getMessageType(mf);
-       }
-       
        private VertexCentricIteration(VertexUpdateFunction<VertexKey, 
VertexValue, Message> uf,
                        MessagingFunction<VertexKey, VertexValue, Message, 
EdgeValue> mf,
-                       DataSet<Tuple3<VertexKey, VertexKey, EdgeValue>> 
edgesWithValue, 
-                       int maximumNumberOfIterations,
-                       boolean edgeHasValueMarker)
+                       DataSet<Edge<VertexKey, EdgeValue>> edgesWithValue, 
+                       int maximumNumberOfIterations, boolean 
edgeHasValueMarker)
        {
                Validate.notNull(uf);
                Validate.notNull(mf);
                Validate.notNull(edgesWithValue);
                Validate.isTrue(maximumNumberOfIterations > 0, "The maximum 
number of iterations must be at least one.");
-               
-               // check that the edges are actually a valid tuple set of 
vertex key types
-               TypeInformation<Tuple3<VertexKey, VertexKey, EdgeValue>> 
edgesType = edgesWithValue.getType();
-               Validate.isTrue(edgesType.isTupleType() && edgesType.getArity() 
== 3, "The edges data set (for edges with edge values) must consist of 
3-tuples.");
-               
-               TupleTypeInfo<?> tupleInfo = (TupleTypeInfo<?>) edgesType;
-               
Validate.isTrue(tupleInfo.getTypeAt(0).equals(tupleInfo.getTypeAt(1))
-                       && 
Comparable.class.isAssignableFrom(tupleInfo.getTypeAt(0).getTypeClass()),
-                       "The first two tuple fields (source and target vertex 
id) must be of the data type that represents the vertex key and implement the 
java.lang.Comparable interface.");
-               
-               Validate.isTrue(maximumNumberOfIterations > 0, "The maximum 
number of iterations must be at least one.");
-               
+
                this.updateFunction = uf;
                this.messagingFunction = mf;
-               this.edgesWithoutValue = null;
                this.edgesWithValue = edgesWithValue;
                this.maximumNumberOfIterations = maximumNumberOfIterations;
-               this.aggregators = new HashMap<String, Aggregator<?>>();
-               
+               this.aggregators = new HashMap<String, Aggregator<?>>();        
        
                this.messageType = getMessageType(mf);
        }
        
@@ -271,20 +230,7 @@ public class VertexCentricIteration<VertexKey extends 
Comparable<VertexKey>, Ver
         * @see 
org.apache.flink.api.java.operators.CustomUnaryOperation#setInput(org.apache.flink.api.java.DataSet)
         */
        @Override
-       public void setInput(DataSet<Tuple2<VertexKey, VertexValue>> inputData) 
{
-               // sanity check that we really have two tuples
-               TypeInformation<Tuple2<VertexKey, VertexValue>> inputType = 
inputData.getType();
-               Validate.isTrue(inputType.isTupleType() && inputType.getArity() 
== 2, "The input data set (the initial vertices) must consist of 2-tuples.");
-
-               // check that the key type here is the same as for the edges
-               TypeInformation<VertexKey> keyType = ((TupleTypeInfo<?>) 
inputType).getTypeAt(0);
-               TypeInformation<?> edgeType = edgesWithoutValue != null ? 
edgesWithoutValue.getType() : edgesWithValue.getType();
-               TypeInformation<VertexKey> edgeKeyType = ((TupleTypeInfo<?>) 
edgeType).getTypeAt(0);
-               
-               Validate.isTrue(keyType.equals(edgeKeyType), "The first tuple 
field (the vertex id) of the input data set (the initial vertices) " +
-                               "must be the same data type as the first fields 
of the edge data set (the source vertex id). " +
-                               "Here, the key type for the vertex ids is '%s' 
and the key type  for the edges is '%s'.", keyType, edgeKeyType);
-
+       public void setInput(DataSet<Vertex<VertexKey, VertexValue>> inputData) 
{
                this.initialVertices = inputData;
        }
        
@@ -294,22 +240,22 @@ public class VertexCentricIteration<VertexKey extends 
Comparable<VertexKey>, Ver
         * @return The operator that represents this vertex-centric graph 
computation.
         */
        @Override
-       public DataSet<Tuple2<VertexKey, VertexValue>> createResult() {
+       public DataSet<Vertex<VertexKey, VertexValue>> createResult() {
                if (this.initialVertices == null) {
                        throw new IllegalStateException("The input data set has 
not been set.");
                }
                
                // prepare some type information
-               TypeInformation<Tuple2<VertexKey, VertexValue>> vertexTypes = 
initialVertices.getType();
+               TypeInformation<Vertex<VertexKey, VertexValue>> vertexTypes = 
initialVertices.getType();
                TypeInformation<VertexKey> keyType = ((TupleTypeInfo<?>) 
initialVertices.getType()).getTypeAt(0);
-               TypeInformation<Tuple2<VertexKey, Message>> messageTypeInfo = 
new TupleTypeInfo<Tuple2<VertexKey,Message>>(keyType, messageType);             
  
-               
+               TypeInformation<Tuple2<VertexKey, Message>> messageTypeInfo = 
new TupleTypeInfo<Tuple2<VertexKey,Message>>(keyType, messageType);
+
                // set up the iteration operator
                final String name = (this.name != null) ? this.name :
                        "Vertex-centric iteration (" + updateFunction + " | " + 
messagingFunction + ")";
                final int[] zeroKeyPos = new int[] {0};
        
-               final DeltaIteration<Tuple2<VertexKey, VertexValue>, 
Tuple2<VertexKey, VertexValue>> iteration =
+               final DeltaIteration<Vertex<VertexKey, VertexValue>, 
Vertex<VertexKey, VertexValue>> iteration =
                        this.initialVertices.iterateDelta(this.initialVertices, 
this.maximumNumberOfIterations, zeroKeyPos);
                iteration.name(name);
                iteration.parallelism(parallelism);
@@ -322,14 +268,8 @@ public class VertexCentricIteration<VertexKey extends 
Comparable<VertexKey>, Ver
                
                // build the messaging function (co group)
                CoGroupOperator<?, ?, Tuple2<VertexKey, Message>> messages;
-               if (edgesWithoutValue != null) {
-                       MessagingUdfNoEdgeValues<VertexKey, VertexValue, 
Message> messenger = new MessagingUdfNoEdgeValues<VertexKey, VertexValue, 
Message>(messagingFunction, messageTypeInfo);
-                       messages = 
this.edgesWithoutValue.coGroup(iteration.getWorkset()).where(0).equalTo(0).with(messenger);
-               }
-               else {
-                       MessagingUdfWithEdgeValues<VertexKey, VertexValue, 
Message, EdgeValue> messenger = new MessagingUdfWithEdgeValues<VertexKey, 
VertexValue, Message, EdgeValue>(messagingFunction, messageTypeInfo);
-                       messages = 
this.edgesWithValue.coGroup(iteration.getWorkset()).where(0).equalTo(0).with(messenger);
-               }
+               MessagingUdfWithEdgeValues<VertexKey, VertexValue, Message, 
EdgeValue> messenger = new MessagingUdfWithEdgeValues<VertexKey, VertexValue, 
Message, EdgeValue>(messagingFunction, messageTypeInfo);
+               messages = 
this.edgesWithValue.coGroup(iteration.getWorkset()).where(0).equalTo(0).with(messenger);
                
                // configure coGroup message function with name and broadcast 
variables
                messages = messages.name("Messaging");
@@ -340,7 +280,7 @@ public class VertexCentricIteration<VertexKey extends 
Comparable<VertexKey>, Ver
                VertexUpdateUdf<VertexKey, VertexValue, Message> updateUdf = 
new VertexUpdateUdf<VertexKey, VertexValue, Message>(updateFunction, 
vertexTypes);
                
                // build the update function (co group)
-               CoGroupOperator<?, ?, Tuple2<VertexKey, VertexValue>> updates =
+               CoGroupOperator<?, ?, Vertex<VertexKey, VertexValue>> updates =
                                
messages.coGroup(iteration.getSolutionSet()).where(0).equalTo(0).with(updateUdf);
                
                // configure coGroup update function with name and broadcast 
variables
@@ -355,43 +295,12 @@ public class VertexCentricIteration<VertexKey extends 
Comparable<VertexKey>, Ver
                return iteration.closeWith(updates, updates);
                
        }
-       
-       // 
--------------------------------------------------------------------------------------------
-       // Constructor builders to avoid signature conflicts with generic type 
erasure
-       // 
--------------------------------------------------------------------------------------------
-       
-       /**
-        * Creates a new vertex-centric iteration operator for graphs where the 
edges are not associated with a value.
-        * 
-        * @param edgesWithoutValue The data set containing edges. Edges are 
represented as 2-tuples: (source-id, target-id)
-        * @param vertexUpdateFunction The function that updates the state of 
the vertices from the incoming messages.
-        * @param messagingFunction The function that turns changed vertex 
states into messages along the edges.
-        * 
-        * @param <VertexKey> The type of the vertex key (the vertex 
identifier).
-        * @param <VertexValue> The type of the vertex value (the state of the 
vertex).
-        * @param <Message> The type of the message sent between vertices along 
the edges.
-        * 
-        * @return An in stance of the vertex-centric graph computation 
operator.
-        */
-       public static final <VertexKey extends Comparable<VertexKey>, 
VertexValue, Message>
-                       VertexCentricIteration<VertexKey, VertexValue, Message, 
?> withPlainEdges(
-                                       DataSet<Tuple2<VertexKey, VertexKey>> 
edgesWithoutValue,
-                                               VertexUpdateFunction<VertexKey, 
VertexValue, Message> vertexUpdateFunction,
-                                               MessagingFunction<VertexKey, 
VertexValue, Message, ?> messagingFunction,
-                                               int maximumNumberOfIterations)
-       {
-               @SuppressWarnings("unchecked")
-               MessagingFunction<VertexKey, VertexValue, Message, Object> tmf 
= 
-                                                               
(MessagingFunction<VertexKey, VertexValue, Message, Object>) messagingFunction;
-               
-               return new VertexCentricIteration<VertexKey, VertexValue, 
Message, Object>(vertexUpdateFunction, tmf, edgesWithoutValue, 
maximumNumberOfIterations);
-       }
-       
+
        /**
         * Creates a new vertex-centric iteration operator for graphs where the 
edges are associated with a value (such as
         * a weight or distance).
         * 
-        * @param edgesWithValue The data set containing edges. Edges are 
represented as 2-tuples: (source-id, target-id)
+        * @param edgesWithValue The data set containing edges.
         * @param uf The function that updates the state of the vertices from 
the incoming messages.
         * @param mf The function that turns changed vertex states into 
messages along the edges.
         * 
@@ -402,9 +311,10 @@ public class VertexCentricIteration<VertexKey extends 
Comparable<VertexKey>, Ver
         * 
         * @return An in stance of the vertex-centric graph computation 
operator.
         */
-       public static final <VertexKey extends Comparable<VertexKey>, 
VertexValue, Message, EdgeValue>
-                       VertexCentricIteration<VertexKey, VertexValue, Message, 
EdgeValue> withValuedEdges(
-                                       DataSet<Tuple3<VertexKey, VertexKey, 
EdgeValue>> edgesWithValue,
+       public static final <VertexKey extends Comparable<VertexKey> & 
Serializable, VertexValue extends Serializable, 
+               Message, EdgeValue extends Serializable>
+                       VertexCentricIteration<VertexKey, VertexValue, Message, 
EdgeValue> withEdges(
+                                       DataSet<Edge<VertexKey, EdgeValue>> 
edgesWithValue,
                                        VertexUpdateFunction<VertexKey, 
VertexValue, Message> uf,
                                        MessagingFunction<VertexKey, 
VertexValue, Message, EdgeValue> mf,
                                        int maximumNumberOfIterations)
@@ -416,9 +326,10 @@ public class VertexCentricIteration<VertexKey extends 
Comparable<VertexKey>, Ver
        //  Wrapping UDFs
        // 
--------------------------------------------------------------------------------------------
        
-       private static final class VertexUpdateUdf<VertexKey extends 
Comparable<VertexKey>, VertexValue, Message> 
-               extends RichCoGroupFunction<Tuple2<VertexKey, Message>, 
Tuple2<VertexKey, VertexValue>, Tuple2<VertexKey, VertexValue>>
-               implements ResultTypeQueryable<Tuple2<VertexKey, VertexValue>>
+       private static final class VertexUpdateUdf<VertexKey extends 
Comparable<VertexKey> & Serializable, 
+               VertexValue extends Serializable, Message> 
+               extends RichCoGroupFunction<Tuple2<VertexKey, Message>, 
Vertex<VertexKey, VertexValue>, Vertex<VertexKey, VertexValue>>
+               implements ResultTypeQueryable<Vertex<VertexKey, VertexValue>>
        {
                private static final long serialVersionUID = 1L;
                
@@ -426,25 +337,25 @@ public class VertexCentricIteration<VertexKey extends 
Comparable<VertexKey>, Ver
 
                private final MessageIterator<Message> messageIter = new 
MessageIterator<Message>();
                
-               private transient TypeInformation<Tuple2<VertexKey, 
VertexValue>> resultType;
+               private transient TypeInformation<Vertex<VertexKey, 
VertexValue>> resultType;
                
                
                private VertexUpdateUdf(VertexUpdateFunction<VertexKey, 
VertexValue, Message> vertexUpdateFunction,
-                               TypeInformation<Tuple2<VertexKey, VertexValue>> 
resultType)
+                               TypeInformation<Vertex<VertexKey, VertexValue>> 
resultType)
                {
                        this.vertexUpdateFunction = vertexUpdateFunction;
                        this.resultType = resultType;
                }
 
                @Override
-               public void coGroup(Iterable<Tuple2<VertexKey, Message>> 
messages, Iterable<Tuple2<VertexKey, VertexValue>> vertex,
-                               Collector<Tuple2<VertexKey, VertexValue>> out)
+               public void coGroup(Iterable<Tuple2<VertexKey, Message>> 
messages, Iterable<Vertex<VertexKey, VertexValue>> vertex,
+                               Collector<Vertex<VertexKey, VertexValue>> out)
                        throws Exception
                {
-                       final Iterator<Tuple2<VertexKey, VertexValue>> 
vertexIter = vertex.iterator();
+                       final Iterator<Vertex<VertexKey, VertexValue>> 
vertexIter = vertex.iterator();
                        
                        if (vertexIter.hasNext()) {
-                               Tuple2<VertexKey, VertexValue> vertexState = 
vertexIter.next();
+                               Vertex<VertexKey, VertexValue> vertexState = 
vertexIter.next();
                                
                                @SuppressWarnings("unchecked")
                                Iterator<Tuple2<?, Message>> downcastIter = 
(Iterator<Tuple2<?, Message>>) (Iterator<?>) messages.iterator();
@@ -482,71 +393,17 @@ public class VertexCentricIteration<VertexKey extends 
Comparable<VertexKey>, Ver
                }
 
                @Override
-               public TypeInformation<Tuple2<VertexKey, VertexValue>> 
getProducedType() {
+               public TypeInformation<Vertex<VertexKey, VertexValue>> 
getProducedType() {
                        return this.resultType;
                }
        }
-       
-       /*
-        * UDF that encapsulates the message sending function for graphs where 
the edges have no associated values.
-        */
-       private static final class MessagingUdfNoEdgeValues<VertexKey extends 
Comparable<VertexKey>, VertexValue, Message> 
-               extends RichCoGroupFunction<Tuple2<VertexKey, VertexKey>, 
Tuple2<VertexKey, VertexValue>, Tuple2<VertexKey, Message>>
-               implements ResultTypeQueryable<Tuple2<VertexKey, Message>>
-       {
-               private static final long serialVersionUID = 1L;
-               
-               private final MessagingFunction<VertexKey, VertexValue, 
Message, ?> messagingFunction;
-               
-               private transient TypeInformation<Tuple2<VertexKey, Message>> 
resultType;
-               
-               
-               private MessagingUdfNoEdgeValues(MessagingFunction<VertexKey, 
VertexValue, Message, ?> messagingFunction,
-                               TypeInformation<Tuple2<VertexKey, Message>> 
resultType)
-               {
-                       this.messagingFunction = messagingFunction;
-                       this.resultType = resultType;
-               }
-               
-               @Override
-               public void coGroup(Iterable<Tuple2<VertexKey, VertexKey>> 
edges,
-                               Iterable<Tuple2<VertexKey, VertexValue>> state, 
Collector<Tuple2<VertexKey, Message>> out)
-                       throws Exception
-               {
-                       final Iterator<Tuple2<VertexKey, VertexValue>> 
stateIter = state.iterator();
-                       
-                       if (stateIter.hasNext()) {
-                               Tuple2<VertexKey, VertexValue> newVertexState = 
stateIter.next();
-                               messagingFunction.set((Iterator<?>) 
edges.iterator(), out);
-                               
messagingFunction.sendMessages(newVertexState.f0, newVertexState.f1);
-                       }
-               }
-               
-               @Override
-               public void open(Configuration parameters) throws Exception {
-                       if (getIterationRuntimeContext().getSuperstepNumber() 
== 1) {
-                               
this.messagingFunction.init(getIterationRuntimeContext(), false);
-                       }
-                       
-                       this.messagingFunction.preSuperstep();
-               }
-               
-               @Override
-               public void close() throws Exception {
-                       this.messagingFunction.postSuperstep();
-               }
 
-               @Override
-               public TypeInformation<Tuple2<VertexKey, Message>> 
getProducedType() {
-                       return this.resultType;
-               }
-       }
-       
        /*
         * UDF that encapsulates the message sending function for graphs where 
the edges have an associated value.
         */
-       private static final class MessagingUdfWithEdgeValues<VertexKey extends 
Comparable<VertexKey>, VertexValue, Message, EdgeValue> 
-               extends RichCoGroupFunction<Tuple3<VertexKey, VertexKey, 
EdgeValue>, Tuple2<VertexKey, VertexValue>, Tuple2<VertexKey, Message>>
+       private static final class MessagingUdfWithEdgeValues<VertexKey extends 
Comparable<VertexKey> & Serializable, 
+               VertexValue extends Serializable, Message, EdgeValue extends 
Serializable> 
+               extends RichCoGroupFunction<Edge<VertexKey, EdgeValue>, 
Vertex<VertexKey, VertexValue>, Tuple2<VertexKey, Message>>
                implements ResultTypeQueryable<Tuple2<VertexKey, Message>>
        {
                private static final long serialVersionUID = 1L;
@@ -564,14 +421,14 @@ public class VertexCentricIteration<VertexKey extends 
Comparable<VertexKey>, Ver
                }
 
                @Override
-               public void coGroup(Iterable<Tuple3<VertexKey, VertexKey, 
EdgeValue>> edges,
-                               Iterable<Tuple2<VertexKey, VertexValue>> state, 
Collector<Tuple2<VertexKey, Message>> out)
+               public void coGroup(Iterable<Edge<VertexKey, EdgeValue>> edges,
+                               Iterable<Vertex<VertexKey, VertexValue>> state, 
Collector<Tuple2<VertexKey, Message>> out)
                        throws Exception
                {
-                       final Iterator<Tuple2<VertexKey, VertexValue>> 
stateIter = state.iterator();
+                       final Iterator<Vertex<VertexKey, VertexValue>> 
stateIter = state.iterator();
                        
                        if (stateIter.hasNext()) {
-                               Tuple2<VertexKey, VertexValue> newVertexState = 
stateIter.next();
+                               Vertex<VertexKey, VertexValue> newVertexState = 
stateIter.next();
                                messagingFunction.set((Iterator<?>) 
edges.iterator(), out);
                                
messagingFunction.sendMessages(newVertexState.f0, newVertexState.f1);
                        }
@@ -580,7 +437,7 @@ public class VertexCentricIteration<VertexKey extends 
Comparable<VertexKey>, Ver
                @Override
                public void open(Configuration parameters) throws Exception {
                        if (getIterationRuntimeContext().getSuperstepNumber() 
== 1) {
-                               
this.messagingFunction.init(getIterationRuntimeContext(), true);
+                               
this.messagingFunction.init(getIterationRuntimeContext());
                        }
                        
                        this.messagingFunction.preSuperstep();

http://git-wip-us.apache.org/repos/asf/flink/blob/d45c0491/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexUpdateFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexUpdateFunction.java
 
b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexUpdateFunction.java
index c3fd2b1..e30451c 100644
--- 
a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexUpdateFunction.java
+++ 
b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexUpdateFunction.java
@@ -23,10 +23,11 @@ import java.util.Collection;
 
 import org.apache.flink.api.common.aggregators.Aggregator;
 import org.apache.flink.api.common.functions.IterationRuntimeContext;
-import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.types.Value;
 import org.apache.flink.util.Collector;
 
+import flink.graphs.Vertex;
+
 /**
  * This class must be extended by functions that compute the state of the 
vertex depending on the old state and the
  * incoming messages. The central method is {@link #updateVertex(Comparable, 
Object, MessageIterator)}, which is
@@ -36,7 +37,8 @@ import org.apache.flink.util.Collector;
  * <VertexValue> The vertex value type.
  * <Message> The message type.
  */
-public abstract class VertexUpdateFunction<VertexKey extends 
Comparable<VertexKey>, VertexValue, Message> implements Serializable {
+public abstract class VertexUpdateFunction<VertexKey extends 
Comparable<VertexKey> & Serializable, 
+       VertexValue extends Serializable, Message> implements Serializable {
 
        private static final long serialVersionUID = 1L;
        
@@ -129,16 +131,16 @@ public abstract class VertexUpdateFunction<VertexKey 
extends Comparable<VertexKe
        
        private IterationRuntimeContext runtimeContext;
        
-       private Collector<Tuple2<VertexKey, VertexValue>> out;
+       private Collector<Vertex<VertexKey, VertexValue>> out;
        
-       private Tuple2<VertexKey, VertexValue> outVal;
+       private Vertex<VertexKey, VertexValue> outVal;
        
        
        void init(IterationRuntimeContext context) {
                this.runtimeContext = context;
        }
        
-       void setOutput(Tuple2<VertexKey, VertexValue> val, 
Collector<Tuple2<VertexKey, VertexValue>> out) {
+       void setOutput(Vertex<VertexKey, VertexValue> val, 
Collector<Vertex<VertexKey, VertexValue>> out) {
                this.out = out;
                this.outVal = val;
        }

Reply via email to