[FLINK-1201] [gelly] return arbitrary type instead of Tuple2 for the neighbor methods
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9a9dfc7c Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9a9dfc7c Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9a9dfc7c Branch: refs/heads/master Commit: 9a9dfc7ca2f79957618821d7572c73710627c9f8 Parents: 4b2c96d Author: vasia <vasilikikala...@gmail.com> Authored: Sun Dec 21 16:12:09 2014 +0100 Committer: Stephan Ewen <se...@apache.org> Committed: Wed Feb 11 10:46:13 2015 +0100 ---------------------------------------------------------------------- .../org/apache/flink/graph/EdgesFunction.java | 2 +- .../graph/EdgesFunctionWithVertexValue.java | 3 +- .../main/java/org/apache/flink/graph/Graph.java | 85 ++++++++++---------- .../apache/flink/graph/NeighborsFunction.java | 3 +- .../graph/NeighborsFunctionWithVertexValue.java | 2 +- .../graph/test/TestReduceOnEdgesMethods.java | 12 +-- .../graph/test/TestReduceOnNeighborMethods.java | 9 ++- 7 files changed, 57 insertions(+), 59 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/9a9dfc7c/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/EdgesFunction.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/EdgesFunction.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/EdgesFunction.java index 4bfaff2..11b30fd 100644 --- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/EdgesFunction.java +++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/EdgesFunction.java @@ -8,5 +8,5 @@ import org.apache.flink.api.java.tuple.Tuple2; public interface EdgesFunction<K extends Comparable<K> & Serializable, EV extends Serializable, O> extends Function, Serializable { - Tuple2<K, O> iterateEdges(Iterable<Tuple2<K, Edge<K, EV>>> edges) throws Exception; + O iterateEdges(Iterable<Tuple2<K, Edge<K, EV>>> edges) throws Exception; } http://git-wip-us.apache.org/repos/asf/flink/blob/9a9dfc7c/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/EdgesFunctionWithVertexValue.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/EdgesFunctionWithVertexValue.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/EdgesFunctionWithVertexValue.java index 366c0b1..d50be0a 100644 --- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/EdgesFunctionWithVertexValue.java +++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/EdgesFunctionWithVertexValue.java @@ -3,10 +3,9 @@ package flink.graphs; import java.io.Serializable; import org.apache.flink.api.common.functions.Function; -import org.apache.flink.api.java.tuple.Tuple2; public interface EdgesFunctionWithVertexValue<K extends Comparable<K> & Serializable, VV extends Serializable, EV extends Serializable, O> extends Function, Serializable { - Tuple2<K, O> iterateEdges(Vertex<K, VV> v, Iterable<Edge<K, EV>> edges) throws Exception; + O iterateEdges(Vertex<K, VV> v, Iterable<Edge<K, EV>> edges) throws Exception; } http://git-wip-us.apache.org/repos/asf/flink/blob/9a9dfc7c/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java index f2f62e3..b5cb2ac 100644 --- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java +++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java @@ -336,10 +336,11 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab * The function applied on the edges has access to the vertex value. * @param edgesFunction the function to apply to the neighborhood * @param direction the edge direction (in-, out-, all-) - * @return a dataset of a Tuple2 with the vertex id and the computed value + * @param <T> the output type + * @return a dataset of a T * @throws IllegalArgumentException */ - public <T> DataSet<Tuple2<K, T>> reduceOnEdges(EdgesFunctionWithVertexValue<K, VV, EV, T> edgesFunction, + public <T> DataSet<T> reduceOnEdges(EdgesFunctionWithVertexValue<K, VV, EV, T> edgesFunction, EdgeDirection direction) throws IllegalArgumentException { switch (direction) { case IN: @@ -363,10 +364,11 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab * (not the vertex value). * @param edgesFunction the function to apply to the neighborhood * @param direction the edge direction (in-, out-, all-) - * @return a dataset of a Tuple2 with the vertex id and the computed value + * @param <T> the output type + * @return a dataset of T * @throws IllegalArgumentException */ - public <T> DataSet<Tuple2<K, T>> reduceOnEdges(EdgesFunction<K, EV, T> edgesFunction, + public <T> DataSet<T> reduceOnEdges(EdgesFunction<K, EV, T> edgesFunction, EdgeDirection direction) throws IllegalArgumentException { switch (direction) { case IN: @@ -399,8 +401,8 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab } private static final class ApplyGroupReduceFunctionOnAllEdges<K extends Comparable<K> & Serializable, - EV extends Serializable, T> implements GroupReduceFunction<Tuple2<K, Edge<K, EV>>, Tuple2<K, T>>, - ResultTypeQueryable<Tuple2<K, T>> { + EV extends Serializable, T> implements GroupReduceFunction<Tuple2<K, Edge<K, EV>>, T>, + ResultTypeQueryable<T> { private EdgesFunction<K, EV, T> function; @@ -409,20 +411,19 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab } public void reduce(final Iterable<Tuple2<K, Edge<K, EV>>> keysWithEdges, - Collector<Tuple2<K, T>> out) throws Exception { + Collector<T> out) throws Exception { out.collect(function.iterateEdges(keysWithEdges)); } @Override - public TypeInformation<Tuple2<K, T>> getProducedType() { - return new TupleTypeInfo<Tuple2<K, T>>(keyType, - TypeExtractor.createTypeInfo(EdgesFunction.class, function.getClass(), 2, null, null)); + public TypeInformation<T> getProducedType() { + return TypeExtractor.createTypeInfo(EdgesFunction.class, function.getClass(), 2, null, null); } } private static final class ApplyGroupReduceFunction<K extends Comparable<K> & Serializable, - EV extends Serializable, T> implements GroupReduceFunction<Tuple2<K, Edge<K, EV>>, Tuple2<K, T>>, - ResultTypeQueryable<Tuple2<K, T>> { + EV extends Serializable, T> implements GroupReduceFunction<Tuple2<K, Edge<K, EV>>, T>, + ResultTypeQueryable<T> { private EdgesFunction<K, EV, T> function; @@ -431,15 +432,14 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab } public void reduce(Iterable<Tuple2<K, Edge<K, EV>>> edges, - Collector<Tuple2<K, T>> out) throws Exception { + Collector<T> out) throws Exception { out.collect(function.iterateEdges(edges)); } @Override - public TypeInformation<Tuple2<K, T>> getProducedType() { - return new TupleTypeInfo<Tuple2<K, T>>(keyType, - TypeExtractor.createTypeInfo(EdgesFunction.class, function.getClass(), 2, null, null)); + public TypeInformation<T> getProducedType() { + return TypeExtractor.createTypeInfo(EdgesFunction.class, function.getClass(), 2, null, null); } } @@ -463,8 +463,8 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab private static final class ApplyCoGroupFunction<K extends Comparable<K> & Serializable, VV extends Serializable, EV extends Serializable, T> - implements CoGroupFunction<Vertex<K, VV>, Edge<K, EV>, Tuple2<K, T>>, - ResultTypeQueryable<Tuple2<K, T>> { + implements CoGroupFunction<Vertex<K, VV>, Edge<K, EV>, T>, + ResultTypeQueryable<T> { private EdgesFunctionWithVertexValue<K, VV, EV, T> function; @@ -472,20 +472,19 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab this.function = fun; } public void coGroup(Iterable<Vertex<K, VV>> vertex, - Iterable<Edge<K, EV>> edges, Collector<Tuple2<K, T>> out) throws Exception { + Iterable<Edge<K, EV>> edges, Collector<T> out) throws Exception { out.collect(function.iterateEdges(vertex.iterator().next(), edges)); } @Override - public TypeInformation<Tuple2<K, T>> getProducedType() { - return new TupleTypeInfo<Tuple2<K, T>>(keyType, - TypeExtractor.createTypeInfo(EdgesFunctionWithVertexValue.class, function.getClass(), 3, null, null)); + public TypeInformation<T> getProducedType() { + return TypeExtractor.createTypeInfo(EdgesFunctionWithVertexValue.class, function.getClass(), 3, null, null); } } private static final class ApplyCoGroupFunctionOnAllEdges<K extends Comparable<K> & Serializable, VV extends Serializable, EV extends Serializable, T> - implements CoGroupFunction<Vertex<K, VV>, Tuple2<K, Edge<K, EV>>, Tuple2<K, T>>, - ResultTypeQueryable<Tuple2<K, T>> { + implements CoGroupFunction<Vertex<K, VV>, Tuple2<K, Edge<K, EV>>, T>, + ResultTypeQueryable<T> { private EdgesFunctionWithVertexValue<K, VV, EV, T> function; @@ -494,7 +493,7 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab } public void coGroup(Iterable<Vertex<K, VV>> vertex, final Iterable<Tuple2<K, Edge<K, EV>>> keysWithEdges, - Collector<Tuple2<K, T>> out) throws Exception { + Collector<T> out) throws Exception { final Iterator<Edge<K, EV>> edgesIterator = new Iterator<Edge<K,EV>>() { @@ -526,9 +525,8 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab } @Override - public TypeInformation<Tuple2<K, T>> getProducedType() { - return new TupleTypeInfo<Tuple2<K, T>>(keyType, - TypeExtractor.createTypeInfo(EdgesFunctionWithVertexValue.class, function.getClass(), 3, null, null)); + public TypeInformation<T> getProducedType() { + return TypeExtractor.createTypeInfo(EdgesFunctionWithVertexValue.class, function.getClass(), 3, null, null); } } @@ -993,10 +991,11 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab * The function applied on the neighbors has access to the vertex value. * @param neighborsFunction the function to apply to the neighborhood * @param direction the edge direction (in-, out-, all-) - * @return a dataset of a Tuple2 with the vertex id and the computed value + * @param <T> the output type + * @return a dataset of a T * @throws IllegalArgumentException */ - public <T> DataSet<Tuple2<K, T>> reduceOnNeighbors(NeighborsFunctionWithVertexValue<K, VV, EV, T> neighborsFunction, + public <T> DataSet<T> reduceOnNeighbors(NeighborsFunctionWithVertexValue<K, VV, EV, T> neighborsFunction, EdgeDirection direction) throws IllegalArgumentException { switch (direction) { case IN: @@ -1036,8 +1035,8 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab private static final class ApplyNeighborCoGroupFunction<K extends Comparable<K> & Serializable, VV extends Serializable, EV extends Serializable, T> - implements CoGroupFunction<Vertex<K, VV>, Tuple2<Edge<K, EV>, Vertex<K, VV>>, Tuple2<K, T>>, - ResultTypeQueryable<Tuple2<K, T>> { + implements CoGroupFunction<Vertex<K, VV>, Tuple2<Edge<K, EV>, Vertex<K, VV>>, T>, + ResultTypeQueryable<T> { private NeighborsFunctionWithVertexValue<K, VV, EV, T> function; @@ -1045,21 +1044,20 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab this.function = fun; } public void coGroup(Iterable<Vertex<K, VV>> vertex, - Iterable<Tuple2<Edge<K, EV>, Vertex<K, VV>>> neighbors, Collector<Tuple2<K, T>> out) throws Exception { + Iterable<Tuple2<Edge<K, EV>, Vertex<K, VV>>> neighbors, Collector<T> out) throws Exception { out.collect(function.iterateNeighbors(vertex.iterator().next(), neighbors)); } @Override - public TypeInformation<Tuple2<K, T>> getProducedType() { - return new TupleTypeInfo<Tuple2<K, T>>(keyType, - TypeExtractor.createTypeInfo(NeighborsFunctionWithVertexValue.class, - function.getClass(), 3, null, null)); + public TypeInformation<T> getProducedType() { + return TypeExtractor.createTypeInfo(NeighborsFunctionWithVertexValue.class, + function.getClass(), 3, null, null); } } private static final class ApplyCoGroupFunctionOnAllNeighbors<K extends Comparable<K> & Serializable, VV extends Serializable, EV extends Serializable, T> - implements CoGroupFunction<Vertex<K, VV>, Tuple3<K, Edge<K, EV>, Vertex<K, VV>>, Tuple2<K, T>>, - ResultTypeQueryable<Tuple2<K, T>> { + implements CoGroupFunction<Vertex<K, VV>, Tuple3<K, Edge<K, EV>, Vertex<K, VV>>, T>, + ResultTypeQueryable<T> { private NeighborsFunctionWithVertexValue<K, VV, EV, T> function; @@ -1068,7 +1066,7 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab } public void coGroup(Iterable<Vertex<K, VV>> vertex, final Iterable<Tuple3<K, Edge<K, EV>, Vertex<K, VV>>> keysWithNeighbors, - Collector<Tuple2<K, T>> out) throws Exception { + Collector<T> out) throws Exception { final Iterator<Tuple2<Edge<K, EV>, Vertex<K, VV>>> neighborsIterator = new Iterator<Tuple2<Edge<K, EV>, Vertex<K, VV>>>() { @@ -1102,10 +1100,9 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab } @Override - public TypeInformation<Tuple2<K, T>> getProducedType() { - return new TupleTypeInfo<Tuple2<K, T>>(keyType, - TypeExtractor.createTypeInfo(NeighborsFunctionWithVertexValue.class, - function.getClass(), 3, null, null)); + public TypeInformation<T> getProducedType() { + return TypeExtractor.createTypeInfo(NeighborsFunctionWithVertexValue.class, + function.getClass(), 3, null, null); } } } http://git-wip-us.apache.org/repos/asf/flink/blob/9a9dfc7c/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/NeighborsFunction.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/NeighborsFunction.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/NeighborsFunction.java index 5158078..63bc527 100644 --- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/NeighborsFunction.java +++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/NeighborsFunction.java @@ -3,11 +3,10 @@ package flink.graphs; import java.io.Serializable; import org.apache.flink.api.common.functions.Function; -import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple3; public interface NeighborsFunction<K extends Comparable<K> & Serializable, VV extends Serializable, EV extends Serializable, O> extends Function, Serializable { - Tuple2<K, O> iterateEdges(Iterable<Tuple3<K, Edge<K, EV>, Vertex<K, VV>>> neighbors) throws Exception; + O iterateEdges(Iterable<Tuple3<K, Edge<K, EV>, Vertex<K, VV>>> neighbors) throws Exception; } http://git-wip-us.apache.org/repos/asf/flink/blob/9a9dfc7c/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/NeighborsFunctionWithVertexValue.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/NeighborsFunctionWithVertexValue.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/NeighborsFunctionWithVertexValue.java index ae7ef2e..d7b438c 100644 --- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/NeighborsFunctionWithVertexValue.java +++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/NeighborsFunctionWithVertexValue.java @@ -8,5 +8,5 @@ import org.apache.flink.api.java.tuple.Tuple2; public interface NeighborsFunctionWithVertexValue<K extends Comparable<K> & Serializable, VV extends Serializable, EV extends Serializable, O> extends Function, Serializable { - Tuple2<K, O> iterateNeighbors(Vertex<K, VV> vertex, Iterable<Tuple2<Edge<K, EV>, Vertex<K, VV>>> neighbors) throws Exception; + O iterateNeighbors(Vertex<K, VV> vertex, Iterable<Tuple2<Edge<K, EV>, Vertex<K, VV>>> neighbors) throws Exception; } http://git-wip-us.apache.org/repos/asf/flink/blob/9a9dfc7c/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestReduceOnEdgesMethods.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestReduceOnEdgesMethods.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestReduceOnEdgesMethods.java index 1fdccfa..1081bd4 100644 --- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestReduceOnEdgesMethods.java +++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestReduceOnEdgesMethods.java @@ -72,7 +72,7 @@ public class TestReduceOnEdgesMethods extends JavaProgramTestBase { TestGraphUtils.getLongLongEdgeData(env), env); DataSet<Tuple2<Long, Long>> verticesWithLowestOutNeighbor = - graph.reduceOnEdges(new EdgesFunctionWithVertexValue<Long, Long, Long, Long>() { + graph.reduceOnEdges(new EdgesFunctionWithVertexValue<Long, Long, Long, Tuple2<Long, Long>>() { public Tuple2<Long, Long> iterateEdges( Vertex<Long, Long> v, @@ -108,7 +108,7 @@ public class TestReduceOnEdgesMethods extends JavaProgramTestBase { TestGraphUtils.getLongLongEdgeData(env), env); DataSet<Tuple2<Long, Long>> verticesWithLowestOutNeighbor = - graph.reduceOnEdges(new EdgesFunctionWithVertexValue<Long, Long, Long, Long>() { + graph.reduceOnEdges(new EdgesFunctionWithVertexValue<Long, Long, Long, Tuple2<Long, Long>>() { public Tuple2<Long, Long> iterateEdges( Vertex<Long, Long> v, @@ -144,7 +144,7 @@ public class TestReduceOnEdgesMethods extends JavaProgramTestBase { TestGraphUtils.getLongLongEdgeData(env), env); DataSet<Tuple2<Long, Long>> verticesWithMaxEdgeWeight = - graph.reduceOnEdges(new EdgesFunctionWithVertexValue<Long, Long, Long, Long>() { + graph.reduceOnEdges(new EdgesFunctionWithVertexValue<Long, Long, Long, Tuple2<Long, Long>>() { public Tuple2<Long, Long> iterateEdges(Vertex<Long, Long> v, Iterable<Edge<Long, Long>> edges) { @@ -177,7 +177,7 @@ public class TestReduceOnEdgesMethods extends JavaProgramTestBase { TestGraphUtils.getLongLongEdgeData(env), env); DataSet<Tuple2<Long, Long>> verticesWithLowestOutNeighbor = - graph.reduceOnEdges(new EdgesFunction<Long, Long, Long>() { + graph.reduceOnEdges(new EdgesFunction<Long, Long, Tuple2<Long, Long>>() { public Tuple2<Long, Long> iterateEdges(Iterable<Tuple2<Long, Edge<Long, Long>>> edges) { @@ -216,7 +216,7 @@ public class TestReduceOnEdgesMethods extends JavaProgramTestBase { TestGraphUtils.getLongLongEdgeData(env), env); DataSet<Tuple2<Long, Long>> verticesWithLowestOutNeighbor = - graph.reduceOnEdges(new EdgesFunction<Long, Long, Long>() { + graph.reduceOnEdges(new EdgesFunction<Long, Long, Tuple2<Long, Long>>() { public Tuple2<Long, Long> iterateEdges(Iterable<Tuple2<Long, Edge<Long, Long>>> edges) { @@ -255,7 +255,7 @@ public class TestReduceOnEdgesMethods extends JavaProgramTestBase { TestGraphUtils.getLongLongEdgeData(env), env); DataSet<Tuple2<Long, Long>> verticesWithMaxEdgeWeight = - graph.reduceOnEdges(new EdgesFunction<Long, Long, Long>() { + graph.reduceOnEdges(new EdgesFunction<Long, Long, Tuple2<Long, Long>>() { public Tuple2<Long, Long> iterateEdges(Iterable<Tuple2<Long, Edge<Long, Long>>> edges) { http://git-wip-us.apache.org/repos/asf/flink/blob/9a9dfc7c/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestReduceOnNeighborMethods.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestReduceOnNeighborMethods.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestReduceOnNeighborMethods.java index b8927aa..7a1413a 100644 --- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestReduceOnNeighborMethods.java +++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestReduceOnNeighborMethods.java @@ -72,7 +72,8 @@ public class TestReduceOnNeighborMethods extends JavaProgramTestBase { TestGraphUtils.getLongLongEdgeData(env), env); DataSet<Tuple2<Long, Long>> verticesWithSumOfOutNeighborValues = - graph.reduceOnNeighbors(new NeighborsFunctionWithVertexValue<Long, Long, Long, Long>() { + graph.reduceOnNeighbors(new NeighborsFunctionWithVertexValue<Long, Long, Long, + Tuple2<Long, Long>>() { public Tuple2<Long, Long> iterateNeighbors(Vertex<Long, Long> vertex, Iterable<Tuple2<Edge<Long, Long>, Vertex<Long, Long>>> neighbors) { long sum = 0; @@ -101,7 +102,8 @@ public class TestReduceOnNeighborMethods extends JavaProgramTestBase { TestGraphUtils.getLongLongEdgeData(env), env); DataSet<Tuple2<Long, Long>> verticesWithSum = - graph.reduceOnNeighbors(new NeighborsFunctionWithVertexValue<Long, Long, Long, Long>() { + graph.reduceOnNeighbors(new NeighborsFunctionWithVertexValue<Long, Long, Long, + Tuple2<Long, Long>>() { public Tuple2<Long, Long> iterateNeighbors(Vertex<Long, Long> vertex, Iterable<Tuple2<Edge<Long, Long>, Vertex<Long, Long>>> neighbors) { long sum = 0; @@ -130,7 +132,8 @@ public class TestReduceOnNeighborMethods extends JavaProgramTestBase { TestGraphUtils.getLongLongEdgeData(env), env); DataSet<Tuple2<Long, Long>> verticesWithSumOfOutNeighborValues = - graph.reduceOnNeighbors(new NeighborsFunctionWithVertexValue<Long, Long, Long, Long>() { + graph.reduceOnNeighbors(new NeighborsFunctionWithVertexValue<Long, Long, Long, + Tuple2<Long, Long>>() { public Tuple2<Long, Long> iterateNeighbors(Vertex<Long, Long> vertex, Iterable<Tuple2<Edge<Long, Long>, Vertex<Long, Long>>> neighbors) { long sum = 0;