[FLINK-1201] [gelly] return arbitrary type instead of Tuple2 for the neighbor 
methods


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9a9dfc7c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9a9dfc7c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9a9dfc7c

Branch: refs/heads/master
Commit: 9a9dfc7ca2f79957618821d7572c73710627c9f8
Parents: 4b2c96d
Author: vasia <vasilikikala...@gmail.com>
Authored: Sun Dec 21 16:12:09 2014 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Feb 11 10:46:13 2015 +0100

----------------------------------------------------------------------
 .../org/apache/flink/graph/EdgesFunction.java   |  2 +-
 .../graph/EdgesFunctionWithVertexValue.java     |  3 +-
 .../main/java/org/apache/flink/graph/Graph.java | 85 ++++++++++----------
 .../apache/flink/graph/NeighborsFunction.java   |  3 +-
 .../graph/NeighborsFunctionWithVertexValue.java |  2 +-
 .../graph/test/TestReduceOnEdgesMethods.java    | 12 +--
 .../graph/test/TestReduceOnNeighborMethods.java |  9 ++-
 7 files changed, 57 insertions(+), 59 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/9a9dfc7c/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/EdgesFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/EdgesFunction.java
 
b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/EdgesFunction.java
index 4bfaff2..11b30fd 100644
--- 
a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/EdgesFunction.java
+++ 
b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/EdgesFunction.java
@@ -8,5 +8,5 @@ import org.apache.flink.api.java.tuple.Tuple2;
 public interface EdgesFunction<K extends Comparable<K> & Serializable, 
        EV extends Serializable, O> extends Function, Serializable {
 
-       Tuple2<K, O> iterateEdges(Iterable<Tuple2<K, Edge<K, EV>>> edges) 
throws Exception;
+       O iterateEdges(Iterable<Tuple2<K, Edge<K, EV>>> edges) throws Exception;
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/9a9dfc7c/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/EdgesFunctionWithVertexValue.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/EdgesFunctionWithVertexValue.java
 
b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/EdgesFunctionWithVertexValue.java
index 366c0b1..d50be0a 100644
--- 
a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/EdgesFunctionWithVertexValue.java
+++ 
b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/EdgesFunctionWithVertexValue.java
@@ -3,10 +3,9 @@ package flink.graphs;
 import java.io.Serializable;
 
 import org.apache.flink.api.common.functions.Function;
-import org.apache.flink.api.java.tuple.Tuple2;
 
 public interface EdgesFunctionWithVertexValue<K extends Comparable<K> & 
Serializable, 
        VV extends Serializable, EV extends Serializable, O> extends Function, 
Serializable {
 
-       Tuple2<K, O> iterateEdges(Vertex<K, VV> v, Iterable<Edge<K, EV>> edges) 
throws Exception;
+       O iterateEdges(Vertex<K, VV> v, Iterable<Edge<K, EV>> edges) throws 
Exception;
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/9a9dfc7c/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java 
b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
index f2f62e3..b5cb2ac 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
@@ -336,10 +336,11 @@ public class Graph<K extends Comparable<K> & 
Serializable, VV extends Serializab
         * The function applied on the edges has access to the vertex value.
         * @param edgesFunction the function to apply to the neighborhood
         * @param direction the edge direction (in-, out-, all-)
-        * @return a dataset of a Tuple2 with the vertex id and the computed 
value
+        * @param <T> the output type 
+        * @return a dataset of a T
         * @throws IllegalArgumentException
         */
-       public <T> DataSet<Tuple2<K, T>> 
reduceOnEdges(EdgesFunctionWithVertexValue<K, VV, EV, T> edgesFunction,
+       public <T> DataSet<T> reduceOnEdges(EdgesFunctionWithVertexValue<K, VV, 
EV, T> edgesFunction,
                        EdgeDirection direction) throws 
IllegalArgumentException {
                switch (direction) {
                case IN:
@@ -363,10 +364,11 @@ public class Graph<K extends Comparable<K> & 
Serializable, VV extends Serializab
         * (not the vertex value).
         * @param edgesFunction the function to apply to the neighborhood
         * @param direction the edge direction (in-, out-, all-)
-        * @return a dataset of a Tuple2 with the vertex id and the computed 
value
+        * @param <T> the output type
+        * @return a dataset of T
         * @throws IllegalArgumentException
         */
-       public <T> DataSet<Tuple2<K, T>> reduceOnEdges(EdgesFunction<K, EV, T> 
edgesFunction,
+       public <T> DataSet<T> reduceOnEdges(EdgesFunction<K, EV, T> 
edgesFunction,
                        EdgeDirection direction) throws 
IllegalArgumentException {
                switch (direction) {
                case IN:
@@ -399,8 +401,8 @@ public class Graph<K extends Comparable<K> & Serializable, 
VV extends Serializab
        }
 
        private static final class ApplyGroupReduceFunctionOnAllEdges<K extends 
Comparable<K> & Serializable, 
-               EV extends Serializable, T> implements 
GroupReduceFunction<Tuple2<K, Edge<K, EV>>, Tuple2<K, T>>,
-               ResultTypeQueryable<Tuple2<K, T>> {
+               EV extends Serializable, T> implements 
GroupReduceFunction<Tuple2<K, Edge<K, EV>>, T>,
+               ResultTypeQueryable<T> {
        
                private EdgesFunction<K, EV, T> function;
 
@@ -409,20 +411,19 @@ public class Graph<K extends Comparable<K> & 
Serializable, VV extends Serializab
                }
 
                public void reduce(final Iterable<Tuple2<K, Edge<K, EV>>> 
keysWithEdges,
-                               Collector<Tuple2<K, T>> out) throws Exception {
+                               Collector<T> out) throws Exception {
                        out.collect(function.iterateEdges(keysWithEdges));
                }
 
                @Override
-               public TypeInformation<Tuple2<K, T>> getProducedType() {
-                       return new TupleTypeInfo<Tuple2<K, T>>(keyType, 
-                                       
TypeExtractor.createTypeInfo(EdgesFunction.class, function.getClass(), 2, null, 
null));
+               public TypeInformation<T> getProducedType() {
+                       return 
TypeExtractor.createTypeInfo(EdgesFunction.class, function.getClass(), 2, null, 
null);
                }
        }
 
        private static final class ApplyGroupReduceFunction<K extends 
Comparable<K> & Serializable, 
-               EV extends Serializable, T> implements 
GroupReduceFunction<Tuple2<K, Edge<K, EV>>, Tuple2<K, T>>,
-               ResultTypeQueryable<Tuple2<K, T>> {
+               EV extends Serializable, T> implements 
GroupReduceFunction<Tuple2<K, Edge<K, EV>>, T>,
+               ResultTypeQueryable<T> {
 
                private EdgesFunction<K, EV, T> function;
 
@@ -431,15 +432,14 @@ public class Graph<K extends Comparable<K> & 
Serializable, VV extends Serializab
                }
 
                public void reduce(Iterable<Tuple2<K, Edge<K, EV>>> edges,
-                               Collector<Tuple2<K, T>> out) throws Exception {
+                               Collector<T> out) throws Exception {
                        out.collect(function.iterateEdges(edges));
                        
                }
 
                @Override
-               public TypeInformation<Tuple2<K, T>> getProducedType() {
-                       return new TupleTypeInfo<Tuple2<K, T>>(keyType, 
-                                       
TypeExtractor.createTypeInfo(EdgesFunction.class, function.getClass(), 2, null, 
null));
+               public TypeInformation<T> getProducedType() {
+                       return 
TypeExtractor.createTypeInfo(EdgesFunction.class, function.getClass(), 2, null, 
null);
                }       
        }
 
@@ -463,8 +463,8 @@ public class Graph<K extends Comparable<K> & Serializable, 
VV extends Serializab
 
        private static final class ApplyCoGroupFunction<K extends Comparable<K> 
& Serializable, 
                VV extends Serializable, EV extends Serializable, T> 
-               implements CoGroupFunction<Vertex<K, VV>, Edge<K, EV>, 
Tuple2<K, T>>,
-               ResultTypeQueryable<Tuple2<K, T>> {
+               implements CoGroupFunction<Vertex<K, VV>, Edge<K, EV>, T>,
+               ResultTypeQueryable<T> {
                
                private EdgesFunctionWithVertexValue<K, VV, EV, T> function;
                
@@ -472,20 +472,19 @@ public class Graph<K extends Comparable<K> & 
Serializable, VV extends Serializab
                        this.function = fun;
                }
                public void coGroup(Iterable<Vertex<K, VV>> vertex,
-                               Iterable<Edge<K, EV>> edges, 
Collector<Tuple2<K, T>> out) throws Exception {
+                               Iterable<Edge<K, EV>> edges, Collector<T> out) 
throws Exception {
                        
out.collect(function.iterateEdges(vertex.iterator().next(), edges));
                }
                @Override
-               public TypeInformation<Tuple2<K, T>> getProducedType() {
-                       return new TupleTypeInfo<Tuple2<K, T>>(keyType, 
-                                       
TypeExtractor.createTypeInfo(EdgesFunctionWithVertexValue.class, 
function.getClass(), 3, null, null));
+               public TypeInformation<T> getProducedType() {
+                       return 
TypeExtractor.createTypeInfo(EdgesFunctionWithVertexValue.class, 
function.getClass(), 3, null, null);
                }
        }
 
        private static final class ApplyCoGroupFunctionOnAllEdges<K extends 
Comparable<K> & Serializable, 
                VV extends Serializable, EV extends Serializable, T> 
-               implements CoGroupFunction<Vertex<K, VV>, Tuple2<K, Edge<K, 
EV>>, Tuple2<K, T>>,
-               ResultTypeQueryable<Tuple2<K, T>> {
+               implements CoGroupFunction<Vertex<K, VV>, Tuple2<K, Edge<K, 
EV>>, T>,
+               ResultTypeQueryable<T> {
 
        private EdgesFunctionWithVertexValue<K, VV, EV, T> function;
 
@@ -494,7 +493,7 @@ public class Graph<K extends Comparable<K> & Serializable, 
VV extends Serializab
        }
 
        public void coGroup(Iterable<Vertex<K, VV>> vertex, final 
Iterable<Tuple2<K, Edge<K, EV>>> keysWithEdges, 
-                       Collector<Tuple2<K, T>> out) throws Exception {
+                       Collector<T> out) throws Exception {
 
                final Iterator<Edge<K, EV>> edgesIterator = new 
Iterator<Edge<K,EV>>() {
 
@@ -526,9 +525,8 @@ public class Graph<K extends Comparable<K> & Serializable, 
VV extends Serializab
        }
 
        @Override
-       public TypeInformation<Tuple2<K, T>> getProducedType() {
-               return new TupleTypeInfo<Tuple2<K, T>>(keyType, 
-                               
TypeExtractor.createTypeInfo(EdgesFunctionWithVertexValue.class, 
function.getClass(), 3, null, null));
+       public TypeInformation<T> getProducedType() {
+               return 
TypeExtractor.createTypeInfo(EdgesFunctionWithVertexValue.class, 
function.getClass(), 3, null, null);
        }
 }
 
@@ -993,10 +991,11 @@ public class Graph<K extends Comparable<K> & 
Serializable, VV extends Serializab
         * The function applied on the neighbors has access to the vertex value.
         * @param neighborsFunction the function to apply to the neighborhood
         * @param direction the edge direction (in-, out-, all-)
-        * @return a dataset of a Tuple2 with the vertex id and the computed 
value
+        * @param <T> the output type
+        * @return a dataset of a T
         * @throws IllegalArgumentException
         */
-       public <T> DataSet<Tuple2<K, T>> 
reduceOnNeighbors(NeighborsFunctionWithVertexValue<K, VV, EV, T> 
neighborsFunction,
+       public <T> DataSet<T> 
reduceOnNeighbors(NeighborsFunctionWithVertexValue<K, VV, EV, T> 
neighborsFunction,
                        EdgeDirection direction) throws 
IllegalArgumentException {
                switch (direction) {
                case IN:
@@ -1036,8 +1035,8 @@ public class Graph<K extends Comparable<K> & 
Serializable, VV extends Serializab
 
        private static final class ApplyNeighborCoGroupFunction<K extends 
Comparable<K> & Serializable, 
                VV extends Serializable, EV extends Serializable, T> 
-               implements CoGroupFunction<Vertex<K, VV>, Tuple2<Edge<K, EV>, 
Vertex<K, VV>>, Tuple2<K, T>>,
-               ResultTypeQueryable<Tuple2<K, T>> {
+               implements CoGroupFunction<Vertex<K, VV>, Tuple2<Edge<K, EV>, 
Vertex<K, VV>>, T>,
+               ResultTypeQueryable<T> {
        
                private NeighborsFunctionWithVertexValue<K, VV, EV, T> function;
                
@@ -1045,21 +1044,20 @@ public class Graph<K extends Comparable<K> & 
Serializable, VV extends Serializab
                        this.function = fun;
                }
                public void coGroup(Iterable<Vertex<K, VV>> vertex,
-                               Iterable<Tuple2<Edge<K, EV>, Vertex<K, VV>>> 
neighbors, Collector<Tuple2<K, T>> out) throws Exception {
+                               Iterable<Tuple2<Edge<K, EV>, Vertex<K, VV>>> 
neighbors, Collector<T> out) throws Exception {
                        
out.collect(function.iterateNeighbors(vertex.iterator().next(), neighbors));
                }
                @Override
-               public TypeInformation<Tuple2<K, T>> getProducedType() {
-                       return new TupleTypeInfo<Tuple2<K, T>>(keyType, 
-                                       
TypeExtractor.createTypeInfo(NeighborsFunctionWithVertexValue.class, 
-                                                       function.getClass(), 3, 
null, null));
+               public TypeInformation<T> getProducedType() {
+                       return 
TypeExtractor.createTypeInfo(NeighborsFunctionWithVertexValue.class, 
+                                                       function.getClass(), 3, 
null, null);
                }
        }
 
        private static final class ApplyCoGroupFunctionOnAllNeighbors<K extends 
Comparable<K> & Serializable, 
                VV extends Serializable, EV extends Serializable, T> 
-               implements CoGroupFunction<Vertex<K, VV>, Tuple3<K, Edge<K, 
EV>, Vertex<K, VV>>, Tuple2<K, T>>,
-               ResultTypeQueryable<Tuple2<K, T>> {
+               implements CoGroupFunction<Vertex<K, VV>, Tuple3<K, Edge<K, 
EV>, Vertex<K, VV>>, T>,
+               ResultTypeQueryable<T> {
 
                private NeighborsFunctionWithVertexValue<K, VV, EV, T> function;
                
@@ -1068,7 +1066,7 @@ public class Graph<K extends Comparable<K> & 
Serializable, VV extends Serializab
                }
 
                public void coGroup(Iterable<Vertex<K, VV>> vertex, final 
Iterable<Tuple3<K, Edge<K, EV>, Vertex<K, VV>>> keysWithNeighbors, 
-                               Collector<Tuple2<K, T>> out) throws Exception {
+                               Collector<T> out) throws Exception {
                
                        final Iterator<Tuple2<Edge<K, EV>, Vertex<K, VV>>> 
neighborsIterator = new Iterator<Tuple2<Edge<K, EV>, Vertex<K, VV>>>() {
                
@@ -1102,10 +1100,9 @@ public class Graph<K extends Comparable<K> & 
Serializable, VV extends Serializab
                        }
 
                @Override
-               public TypeInformation<Tuple2<K, T>> getProducedType() {
-                       return new TupleTypeInfo<Tuple2<K, T>>(keyType, 
-                                       
TypeExtractor.createTypeInfo(NeighborsFunctionWithVertexValue.class, 
-                                                       function.getClass(), 3, 
null, null));
+               public TypeInformation<T> getProducedType() {
+                       return 
TypeExtractor.createTypeInfo(NeighborsFunctionWithVertexValue.class, 
+                                                       function.getClass(), 3, 
null, null);
                }
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/9a9dfc7c/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/NeighborsFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/NeighborsFunction.java
 
b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/NeighborsFunction.java
index 5158078..63bc527 100644
--- 
a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/NeighborsFunction.java
+++ 
b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/NeighborsFunction.java
@@ -3,11 +3,10 @@ package flink.graphs;
 import java.io.Serializable;
 
 import org.apache.flink.api.common.functions.Function;
-import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple3;
 
 public interface NeighborsFunction<K extends Comparable<K> & Serializable, VV 
extends Serializable, 
        EV extends Serializable, O> extends Function, Serializable {
 
-       Tuple2<K, O> iterateEdges(Iterable<Tuple3<K, Edge<K, EV>, Vertex<K, 
VV>>> neighbors) throws Exception;
+       O iterateEdges(Iterable<Tuple3<K, Edge<K, EV>, Vertex<K, VV>>> 
neighbors) throws Exception;
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/9a9dfc7c/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/NeighborsFunctionWithVertexValue.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/NeighborsFunctionWithVertexValue.java
 
b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/NeighborsFunctionWithVertexValue.java
index ae7ef2e..d7b438c 100644
--- 
a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/NeighborsFunctionWithVertexValue.java
+++ 
b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/NeighborsFunctionWithVertexValue.java
@@ -8,5 +8,5 @@ import org.apache.flink.api.java.tuple.Tuple2;
 public interface NeighborsFunctionWithVertexValue<K extends Comparable<K> & 
Serializable, VV extends Serializable, 
        EV extends Serializable, O> extends Function, Serializable {
 
-       Tuple2<K, O> iterateNeighbors(Vertex<K, VV> vertex, 
Iterable<Tuple2<Edge<K, EV>, Vertex<K, VV>>> neighbors) throws Exception;
+       O iterateNeighbors(Vertex<K, VV> vertex, Iterable<Tuple2<Edge<K, EV>, 
Vertex<K, VV>>> neighbors) throws Exception;
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/9a9dfc7c/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestReduceOnEdgesMethods.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestReduceOnEdgesMethods.java
 
b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestReduceOnEdgesMethods.java
index 1fdccfa..1081bd4 100644
--- 
a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestReduceOnEdgesMethods.java
+++ 
b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestReduceOnEdgesMethods.java
@@ -72,7 +72,7 @@ public class TestReduceOnEdgesMethods extends 
JavaProgramTestBase {
                                                
TestGraphUtils.getLongLongEdgeData(env), env);
 
                                DataSet<Tuple2<Long, Long>> 
verticesWithLowestOutNeighbor = 
-                                               graph.reduceOnEdges(new 
EdgesFunctionWithVertexValue<Long, Long, Long, Long>() {
+                                               graph.reduceOnEdges(new 
EdgesFunctionWithVertexValue<Long, Long, Long, Tuple2<Long, Long>>() {
 
                                        public Tuple2<Long, Long> iterateEdges(
                                                        Vertex<Long, Long> v,
@@ -108,7 +108,7 @@ public class TestReduceOnEdgesMethods extends 
JavaProgramTestBase {
                                                
TestGraphUtils.getLongLongEdgeData(env), env);
 
                                DataSet<Tuple2<Long, Long>> 
verticesWithLowestOutNeighbor = 
-                                               graph.reduceOnEdges(new 
EdgesFunctionWithVertexValue<Long, Long, Long, Long>() {
+                                               graph.reduceOnEdges(new 
EdgesFunctionWithVertexValue<Long, Long, Long, Tuple2<Long, Long>>() {
 
                                        public Tuple2<Long, Long> iterateEdges(
                                                        Vertex<Long, Long> v,
@@ -144,7 +144,7 @@ public class TestReduceOnEdgesMethods extends 
JavaProgramTestBase {
                                                
TestGraphUtils.getLongLongEdgeData(env), env);
 
                                DataSet<Tuple2<Long, Long>> 
verticesWithMaxEdgeWeight = 
-                                               graph.reduceOnEdges(new 
EdgesFunctionWithVertexValue<Long, Long, Long, Long>() {
+                                               graph.reduceOnEdges(new 
EdgesFunctionWithVertexValue<Long, Long, Long, Tuple2<Long, Long>>() {
 
                                        public Tuple2<Long, Long> 
iterateEdges(Vertex<Long, Long> v,
                                                        Iterable<Edge<Long, 
Long>> edges) {
@@ -177,7 +177,7 @@ public class TestReduceOnEdgesMethods extends 
JavaProgramTestBase {
                                                
TestGraphUtils.getLongLongEdgeData(env), env);
 
                                DataSet<Tuple2<Long, Long>> 
verticesWithLowestOutNeighbor = 
-                                               graph.reduceOnEdges(new 
EdgesFunction<Long, Long, Long>() {
+                                               graph.reduceOnEdges(new 
EdgesFunction<Long, Long, Tuple2<Long, Long>>() {
 
                                        public Tuple2<Long, Long> 
iterateEdges(Iterable<Tuple2<Long, Edge<Long, Long>>> edges) {
 
@@ -216,7 +216,7 @@ public class TestReduceOnEdgesMethods extends 
JavaProgramTestBase {
                                                
TestGraphUtils.getLongLongEdgeData(env), env);
 
                                DataSet<Tuple2<Long, Long>> 
verticesWithLowestOutNeighbor = 
-                                               graph.reduceOnEdges(new 
EdgesFunction<Long, Long, Long>() {
+                                               graph.reduceOnEdges(new 
EdgesFunction<Long, Long, Tuple2<Long, Long>>() {
 
                                        public Tuple2<Long, Long> 
iterateEdges(Iterable<Tuple2<Long, Edge<Long, Long>>> edges) {
                                                
@@ -255,7 +255,7 @@ public class TestReduceOnEdgesMethods extends 
JavaProgramTestBase {
                                                
TestGraphUtils.getLongLongEdgeData(env), env);
 
                                DataSet<Tuple2<Long, Long>> 
verticesWithMaxEdgeWeight = 
-                                               graph.reduceOnEdges(new 
EdgesFunction<Long, Long, Long>() {
+                                               graph.reduceOnEdges(new 
EdgesFunction<Long, Long, Tuple2<Long, Long>>() {
 
                                        public Tuple2<Long, Long> 
iterateEdges(Iterable<Tuple2<Long, Edge<Long, Long>>> edges) {
                                                

http://git-wip-us.apache.org/repos/asf/flink/blob/9a9dfc7c/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestReduceOnNeighborMethods.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestReduceOnNeighborMethods.java
 
b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestReduceOnNeighborMethods.java
index b8927aa..7a1413a 100644
--- 
a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestReduceOnNeighborMethods.java
+++ 
b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestReduceOnNeighborMethods.java
@@ -72,7 +72,8 @@ public class TestReduceOnNeighborMethods extends 
JavaProgramTestBase {
                                                
TestGraphUtils.getLongLongEdgeData(env), env);
 
                                DataSet<Tuple2<Long, Long>> 
verticesWithSumOfOutNeighborValues = 
-                                               graph.reduceOnNeighbors(new 
NeighborsFunctionWithVertexValue<Long, Long, Long, Long>() {
+                                               graph.reduceOnNeighbors(new 
NeighborsFunctionWithVertexValue<Long, Long, Long, 
+                                                               Tuple2<Long, 
Long>>() {
                                                        public Tuple2<Long, 
Long> iterateNeighbors(Vertex<Long, Long> vertex,
                                                                        
Iterable<Tuple2<Edge<Long, Long>, Vertex<Long, Long>>> neighbors) {
                                                                long sum = 0;
@@ -101,7 +102,8 @@ public class TestReduceOnNeighborMethods extends 
JavaProgramTestBase {
                                                
TestGraphUtils.getLongLongEdgeData(env), env);
 
                                DataSet<Tuple2<Long, Long>> verticesWithSum = 
-                                               graph.reduceOnNeighbors(new 
NeighborsFunctionWithVertexValue<Long, Long, Long, Long>() {
+                                               graph.reduceOnNeighbors(new 
NeighborsFunctionWithVertexValue<Long, Long, Long, 
+                                                               Tuple2<Long, 
Long>>() {
                                                        public Tuple2<Long, 
Long> iterateNeighbors(Vertex<Long, Long> vertex,
                                                                        
Iterable<Tuple2<Edge<Long, Long>, Vertex<Long, Long>>> neighbors) {
                                                                long sum = 0;
@@ -130,7 +132,8 @@ public class TestReduceOnNeighborMethods extends 
JavaProgramTestBase {
                                                
TestGraphUtils.getLongLongEdgeData(env), env);
 
                                DataSet<Tuple2<Long, Long>> 
verticesWithSumOfOutNeighborValues = 
-                                               graph.reduceOnNeighbors(new 
NeighborsFunctionWithVertexValue<Long, Long, Long, Long>() {
+                                               graph.reduceOnNeighbors(new 
NeighborsFunctionWithVertexValue<Long, Long, Long, 
+                                                               Tuple2<Long, 
Long>>() {
                                                        public Tuple2<Long, 
Long> iterateNeighbors(Vertex<Long, Long> vertex,
                                                                        
Iterable<Tuple2<Edge<Long, Long>, Vertex<Long, Long>>> neighbors) {
                                                                long sum = 0;

Reply via email to