[FLINK-1201] [gelly] reduceOnNeighbors with vertex value

Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4b2c96d9
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4b2c96d9
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4b2c96d9

Branch: refs/heads/master
Commit: 4b2c96d9cc87237b60e92855848d822e411e4472
Parents: b5b63d7
Author: vasia <vasilikikala...@gmail.com>
Authored: Sun Dec 21 15:46:40 2014 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Feb 11 10:46:13 2015 +0100

----------------------------------------------------------------------
 .../main/java/org/apache/flink/graph/Graph.java | 130 +++++++++
 .../apache/flink/graph/NeighborsFunction.java   |  13 +
 .../graph/NeighborsFunctionWithVertexValue.java |  12 +
 .../flink/graph/test/TestNeighborMethods.java   | 290 -------------------
 .../graph/test/TestReduceOnEdgesMethods.java    | 290 +++++++++++++++++++
 .../graph/test/TestReduceOnNeighborMethods.java | 157 ++++++++++
 6 files changed, 602 insertions(+), 290 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/4b2c96d9/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java 
b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
index 48b45de..f2f62e3 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
@@ -41,6 +41,7 @@ import org.apache.flink.api.java.io.CsvReader;
 import org.apache.flink.api.java.operators.DeltaIteration;
 import org.apache.flink.api.java.tuple.Tuple1;
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
@@ -451,6 +452,15 @@ public class Graph<K extends Comparable<K> & Serializable, 
VV extends Serializab
                }
        }
 
+       private static final class EmitOneEdgeWithNeighborPerNode<K extends 
Comparable<K> & Serializable, 
+               VV extends Serializable, EV extends Serializable> implements 
FlatMapFunction<
+               Edge<K, EV>, Tuple3<K, K, Edge<K, EV>>> {
+               public void flatMap(Edge<K, EV> edge, Collector<Tuple3<K, K, 
Edge<K, EV>>> out) {
+                       out.collect(new Tuple3<K, K, Edge<K, 
EV>>(edge.getSource(), edge.getTarget(), edge));
+                       out.collect(new Tuple3<K, K, Edge<K, 
EV>>(edge.getTarget(), edge.getSource(), edge));
+               }
+       }
+
        private static final class ApplyCoGroupFunction<K extends Comparable<K> 
& Serializable, 
                VV extends Serializable, EV extends Serializable, T> 
                implements CoGroupFunction<Vertex<K, VV>, Edge<K, EV>, 
Tuple2<K, T>>,
@@ -978,4 +988,124 @@ public class Graph<K extends Comparable<K> & 
Serializable, VV extends Serializab
                return algorithm.run(this);
        }
 
+       /**
+        * Compute an aggregate over the neighbors (edges and vertices) of each 
vertex.
+        * The function applied on the neighbors has access to the vertex value.
+        * @param neighborsFunction the function to apply to the neighborhood
+        * @param direction the edge direction (in-, out-, all-)
+        * @return a dataset of a Tuple2 with the vertex id and the computed 
value
+        * @throws IllegalArgumentException
+        */
+       public <T> DataSet<Tuple2<K, T>> 
reduceOnNeighbors(NeighborsFunctionWithVertexValue<K, VV, EV, T> 
neighborsFunction,
+                       EdgeDirection direction) throws 
IllegalArgumentException {
+               switch (direction) {
+               case IN:
+                       // create <edge-sourceVertex> pairs
+                       DataSet<Tuple2<Edge<K, EV>, Vertex<K, VV>>> 
edgesWithSources = edges.join(this.vertices)
+                               .where(0).equalTo(0);
+                       return 
vertices.coGroup(edgesWithSources).where(0).equalTo("f0.f1").with(
+                                       new ApplyNeighborCoGroupFunction<K, VV, 
EV, T>(neighborsFunction));
+               case OUT:
+                       // create <edge-targetVertex> pairs
+                       DataSet<Tuple2<Edge<K, EV>, Vertex<K, VV>>> 
edgesWithTargets = edges.join(this.vertices)
+                               .where(1).equalTo(0);
+                       return 
vertices.coGroup(edgesWithTargets).where(0).equalTo("f0.f0").with(
+                                       new ApplyNeighborCoGroupFunction<K, VV, 
EV, T>(neighborsFunction));
+               case ALL:
+                       // create <edge-sourceOrTargetVertex> pairs
+                       DataSet<Tuple3<K, Edge<K, EV>, Vertex<K, VV>>> 
edgesWithNeighbors = edges.flatMap(
+                                       new EmitOneEdgeWithNeighborPerNode<K, 
VV, EV>()).join(this.vertices)
+                                       .where(1).equalTo(0).with(new 
ProjectEdgeWithNeighbor<K, VV, EV>());
+
+                       return 
vertices.coGroup(edgesWithNeighbors).where(0).equalTo(0)
+                                       .with(new 
ApplyCoGroupFunctionOnAllNeighbors<K, VV, EV, T>(neighborsFunction));
+               default:
+                       throw new IllegalArgumentException("Illegal edge 
direction");
+               }
+       }
+
+       private static final class ProjectEdgeWithNeighbor<K extends 
Comparable<K> & Serializable, 
+               VV extends Serializable, EV extends Serializable> implements 
+               FlatJoinFunction<Tuple3<K, K, Edge<K, EV>>, Vertex<K, VV>, 
Tuple3<K, Edge<K, EV>, Vertex<K, VV>>> {
+               public void join(Tuple3<K, K, Edge<K, EV>> keysWithEdge, 
Vertex<K, VV> neighbor,
+                               Collector<Tuple3<K, Edge<K, EV>, Vertex<K, 
VV>>> out) {
+                       out.collect(new Tuple3<K, Edge<K, EV>, Vertex<K, 
VV>>(keysWithEdge.f0, 
+                                       keysWithEdge.f2, neighbor));
+               }
+       }
+
+       private static final class ApplyNeighborCoGroupFunction<K extends 
Comparable<K> & Serializable, 
+               VV extends Serializable, EV extends Serializable, T> 
+               implements CoGroupFunction<Vertex<K, VV>, Tuple2<Edge<K, EV>, 
Vertex<K, VV>>, Tuple2<K, T>>,
+               ResultTypeQueryable<Tuple2<K, T>> {
+       
+               private NeighborsFunctionWithVertexValue<K, VV, EV, T> function;
+               
+               public ApplyNeighborCoGroupFunction 
(NeighborsFunctionWithVertexValue<K, VV, EV, T> fun) {
+                       this.function = fun;
+               }
+               public void coGroup(Iterable<Vertex<K, VV>> vertex,
+                               Iterable<Tuple2<Edge<K, EV>, Vertex<K, VV>>> 
neighbors, Collector<Tuple2<K, T>> out) throws Exception {
+                       
out.collect(function.iterateNeighbors(vertex.iterator().next(), neighbors));
+               }
+               @Override
+               public TypeInformation<Tuple2<K, T>> getProducedType() {
+                       return new TupleTypeInfo<Tuple2<K, T>>(keyType, 
+                                       
TypeExtractor.createTypeInfo(NeighborsFunctionWithVertexValue.class, 
+                                                       function.getClass(), 3, 
null, null));
+               }
+       }
+
+       private static final class ApplyCoGroupFunctionOnAllNeighbors<K extends 
Comparable<K> & Serializable, 
+               VV extends Serializable, EV extends Serializable, T> 
+               implements CoGroupFunction<Vertex<K, VV>, Tuple3<K, Edge<K, 
EV>, Vertex<K, VV>>, Tuple2<K, T>>,
+               ResultTypeQueryable<Tuple2<K, T>> {
+
+               private NeighborsFunctionWithVertexValue<K, VV, EV, T> function;
+               
+               public ApplyCoGroupFunctionOnAllNeighbors 
(NeighborsFunctionWithVertexValue<K, VV, EV, T> fun) {
+                       this.function = fun;
+               }
+
+               public void coGroup(Iterable<Vertex<K, VV>> vertex, final 
Iterable<Tuple3<K, Edge<K, EV>, Vertex<K, VV>>> keysWithNeighbors, 
+                               Collector<Tuple2<K, T>> out) throws Exception {
+               
+                       final Iterator<Tuple2<Edge<K, EV>, Vertex<K, VV>>> 
neighborsIterator = new Iterator<Tuple2<Edge<K, EV>, Vertex<K, VV>>>() {
+               
+                               final Iterator<Tuple3<K, Edge<K, EV>, Vertex<K, 
VV>>> keysWithEdgesIterator = 
+                                               keysWithNeighbors.iterator();
+               
+                               @Override
+                               public boolean hasNext() {
+                                       return keysWithEdgesIterator.hasNext();
+                               }
+               
+                               @Override
+                               public Tuple2<Edge<K, EV>, Vertex<K, VV>> 
next() {
+                                       Tuple3<K, Edge<K, EV>, Vertex<K, VV>> 
next = keysWithEdgesIterator.next(); 
+                                       return new Tuple2<Edge<K, EV>, 
Vertex<K, VV>>(next.f1, next.f2);
+                               }
+               
+                               @Override
+                               public void remove() {
+                                       keysWithEdgesIterator.remove();
+                               }                       
+                       };
+                       
+                       Iterable<Tuple2<Edge<K, EV>, Vertex<K, VV>>> 
neighborsIterable = new Iterable<Tuple2<Edge<K, EV>, Vertex<K, VV>>>() {
+                               public Iterator<Tuple2<Edge<K, EV>, Vertex<K, 
VV>>> iterator() {
+                                       return neighborsIterator;
+                               }
+                       };
+               
+                       
out.collect(function.iterateNeighbors(vertex.iterator().next(), 
neighborsIterable));
+                       }
+
+               @Override
+               public TypeInformation<Tuple2<K, T>> getProducedType() {
+                       return new TupleTypeInfo<Tuple2<K, T>>(keyType, 
+                                       
TypeExtractor.createTypeInfo(NeighborsFunctionWithVertexValue.class, 
+                                                       function.getClass(), 3, 
null, null));
+               }
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/4b2c96d9/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/NeighborsFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/NeighborsFunction.java
 
b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/NeighborsFunction.java
new file mode 100644
index 0000000..5158078
--- /dev/null
+++ 
b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/NeighborsFunction.java
@@ -0,0 +1,13 @@
+package flink.graphs;
+
+import java.io.Serializable;
+
+import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+
+public interface NeighborsFunction<K extends Comparable<K> & Serializable, VV 
extends Serializable, 
+       EV extends Serializable, O> extends Function, Serializable {
+
+       Tuple2<K, O> iterateEdges(Iterable<Tuple3<K, Edge<K, EV>, Vertex<K, 
VV>>> neighbors) throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4b2c96d9/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/NeighborsFunctionWithVertexValue.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/NeighborsFunctionWithVertexValue.java
 
b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/NeighborsFunctionWithVertexValue.java
new file mode 100644
index 0000000..ae7ef2e
--- /dev/null
+++ 
b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/NeighborsFunctionWithVertexValue.java
@@ -0,0 +1,12 @@
+package flink.graphs;
+
+import java.io.Serializable;
+
+import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.api.java.tuple.Tuple2;
+
+public interface NeighborsFunctionWithVertexValue<K extends Comparable<K> & 
Serializable, VV extends Serializable, 
+       EV extends Serializable, O> extends Function, Serializable {
+
+       Tuple2<K, O> iterateNeighbors(Vertex<K, VV> vertex, 
Iterable<Tuple2<Edge<K, EV>, Vertex<K, VV>>> neighbors) throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4b2c96d9/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestNeighborMethods.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestNeighborMethods.java
 
b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestNeighborMethods.java
deleted file mode 100644
index 6a97767..0000000
--- 
a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestNeighborMethods.java
+++ /dev/null
@@ -1,290 +0,0 @@
-package flink.graphs;
-
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.util.Collection;
-import java.util.LinkedList;
-
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.test.util.JavaProgramTestBase;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-import org.junit.runners.Parameterized.Parameters;
-
-@RunWith(Parameterized.class)
-public class TestNeighborMethods extends JavaProgramTestBase {
-
-       private static int NUM_PROGRAMS = 6;
-       
-       private int curProgId = config.getInteger("ProgramId", -1);
-       private String resultPath;
-       private String expectedResult;
-       
-       public TestNeighborMethods(Configuration config) {
-               super(config);
-       }
-       
-       @Override
-       protected void preSubmit() throws Exception {
-               resultPath = getTempDirPath("result");
-       }
-
-       @Override
-       protected void testProgram() throws Exception {
-               expectedResult = GraphProgs.runProgram(curProgId, resultPath);
-       }
-       
-       @Override
-       protected void postSubmit() throws Exception {
-               compareResultsByLinesInMemory(expectedResult, resultPath);
-       }
-       
-       @Parameters
-       public static Collection<Object[]> getConfigurations() throws 
FileNotFoundException, IOException {
-
-               LinkedList<Configuration> tConfigs = new 
LinkedList<Configuration>();
-
-               for(int i=1; i <= NUM_PROGRAMS; i++) {
-                       Configuration config = new Configuration();
-                       config.setInteger("ProgramId", i);
-                       tConfigs.add(config);
-               }
-               
-               return toParameterList(tConfigs);
-       }
-       
-       private static class GraphProgs {
-       
-               @SuppressWarnings("serial")
-               public static String runProgram(int progId, String resultPath) 
throws Exception {
-                       
-                       switch(progId) {
-                       case 1: {
-                               /*
-                                * Get the lowest-weight out-neighbor
-                                * for each vertex
-                        */
-                               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-                               Graph<Long, Long, Long> graph = 
Graph.create(TestGraphUtils.getLongLongVertexData(env), 
-                                               
TestGraphUtils.getLongLongEdgeData(env), env);
-
-                               DataSet<Tuple2<Long, Long>> 
verticesWithLowestOutNeighbor = 
-                                               graph.reduceOnEdges(new 
EdgesFunctionWithVertexValue<Long, Long, Long, Long>() {
-
-                                       public Tuple2<Long, Long> iterateEdges(
-                                                       Vertex<Long, Long> v,
-                                                       Iterable<Edge<Long, 
Long>> edges) {
-                                               
-                                               long weight = Long.MAX_VALUE;
-                                               long minNeighorId = 0;
-                                               
-                                               for (Edge<Long, Long> edge: 
edges) {
-                                                       if (edge.getValue() < 
weight) {
-                                                               weight = 
edge.getValue();
-                                                               minNeighorId = 
edge.getTarget();
-                                                       }
-                                               }
-                                               return new Tuple2<Long, 
Long>(v.getId(), minNeighorId);
-                                       }
-                               }, EdgeDirection.OUT);
-                               
verticesWithLowestOutNeighbor.writeAsCsv(resultPath);
-                               env.execute();
-                               return "1,2\n" +
-                                               "2,3\n" + 
-                                               "3,4\n" +
-                                               "4,5\n" + 
-                                               "5,1\n";
-                       }
-                       case 2: {
-                               /*
-                                * Get the lowest-weight in-neighbor
-                                * for each vertex
-                        */
-                               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-                               Graph<Long, Long, Long> graph = 
Graph.create(TestGraphUtils.getLongLongVertexData(env), 
-                                               
TestGraphUtils.getLongLongEdgeData(env), env);
-
-                               DataSet<Tuple2<Long, Long>> 
verticesWithLowestOutNeighbor = 
-                                               graph.reduceOnEdges(new 
EdgesFunctionWithVertexValue<Long, Long, Long, Long>() {
-
-                                       public Tuple2<Long, Long> iterateEdges(
-                                                       Vertex<Long, Long> v,
-                                                       Iterable<Edge<Long, 
Long>> edges) {
-                                               
-                                               long weight = Long.MAX_VALUE;
-                                               long minNeighorId = 0;
-                                               
-                                               for (Edge<Long, Long> edge: 
edges) {
-                                                       if (edge.getValue() < 
weight) {
-                                                               weight = 
edge.getValue();
-                                                               minNeighorId = 
edge.getSource();
-                                                       }
-                                               }
-                                               return new Tuple2<Long, 
Long>(v.getId(), minNeighorId);
-                                       }
-                               }, EdgeDirection.IN);
-                               
verticesWithLowestOutNeighbor.writeAsCsv(resultPath);
-                               env.execute();
-                               return "1,5\n" +
-                                               "2,1\n" + 
-                                               "3,1\n" +
-                                               "4,3\n" + 
-                                               "5,3\n";
-                       }
-                       case 3: {
-                               /*
-                                * Get the maximum weight among all edges
-                                * of a vertex
-                        */
-                               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-                               Graph<Long, Long, Long> graph = 
Graph.create(TestGraphUtils.getLongLongVertexData(env), 
-                                               
TestGraphUtils.getLongLongEdgeData(env), env);
-
-                               DataSet<Tuple2<Long, Long>> 
verticesWithMaxEdgeWeight = 
-                                               graph.reduceOnEdges(new 
EdgesFunctionWithVertexValue<Long, Long, Long, Long>() {
-
-                                       public Tuple2<Long, Long> 
iterateEdges(Vertex<Long, Long> v,
-                                                       Iterable<Edge<Long, 
Long>> edges) {
-                                               
-                                               long weight = Long.MIN_VALUE;
-
-                                               for (Edge<Long, Long> edge: 
edges) {
-                                                       if (edge.getValue() > 
weight) {
-                                                               weight = 
edge.getValue();
-                                                       }
-                                               }
-                                               return new Tuple2<Long, 
Long>(v.getId(), weight);
-                                       }
-                               }, EdgeDirection.ALL);
-                               
verticesWithMaxEdgeWeight.writeAsCsv(resultPath);
-                               env.execute();
-                               return "1,51\n" +
-                                               "2,23\n" + 
-                                               "3,35\n" +
-                                               "4,45\n" + 
-                                               "5,51\n";
-                       }
-                       case 4: {
-                               /*
-                                * Get the lowest-weight out-neighbor
-                                * for each vertex
-                        */
-                               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-                               Graph<Long, Long, Long> graph = 
Graph.create(TestGraphUtils.getLongLongVertexData(env), 
-                                               
TestGraphUtils.getLongLongEdgeData(env), env);
-
-                               DataSet<Tuple2<Long, Long>> 
verticesWithLowestOutNeighbor = 
-                                               graph.reduceOnEdges(new 
EdgesFunction<Long, Long, Long>() {
-
-                                       public Tuple2<Long, Long> 
iterateEdges(Iterable<Tuple2<Long, Edge<Long, Long>>> edges) {
-
-                                               long weight = Long.MAX_VALUE;
-                                               long minNeighorId = 0;
-                                               long vertexId = -1;
-                                               long i=0;
-
-                                               for (Tuple2<Long, Edge<Long, 
Long>> edge: edges) {
-                                                       if (edge.f1.getValue() 
< weight) {
-                                                               weight = 
edge.f1.getValue();
-                                                               minNeighorId = 
edge.f1.getTarget();
-                                                       }
-                                                       if (i==0) {
-                                                               vertexId = 
edge.f0;
-                                                       } i++;
-                                               }
-                                               return new Tuple2<Long, 
Long>(vertexId, minNeighorId);
-                                       }
-                               }, EdgeDirection.OUT);
-                               
verticesWithLowestOutNeighbor.writeAsCsv(resultPath);
-                               env.execute();
-                               return "1,2\n" +
-                                               "2,3\n" + 
-                                               "3,4\n" +
-                                               "4,5\n" + 
-                                               "5,1\n";
-                       }
-                       case 5: {
-                               /*
-                                * Get the lowest-weight in-neighbor
-                                * for each vertex
-                        */
-                               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-                               Graph<Long, Long, Long> graph = 
Graph.create(TestGraphUtils.getLongLongVertexData(env), 
-                                               
TestGraphUtils.getLongLongEdgeData(env), env);
-
-                               DataSet<Tuple2<Long, Long>> 
verticesWithLowestOutNeighbor = 
-                                               graph.reduceOnEdges(new 
EdgesFunction<Long, Long, Long>() {
-
-                                       public Tuple2<Long, Long> 
iterateEdges(Iterable<Tuple2<Long, Edge<Long, Long>>> edges) {
-                                               
-                                               long weight = Long.MAX_VALUE;
-                                               long minNeighorId = 0;
-                                               long vertexId = -1;
-                                               long i=0;
-
-                                               for (Tuple2<Long, Edge<Long, 
Long>> edge: edges) {
-                                                       if (edge.f1.getValue() 
< weight) {
-                                                               weight = 
edge.f1.getValue();
-                                                               minNeighorId = 
edge.f1.getSource();
-                                                       }
-                                                       if (i==0) {
-                                                               vertexId = 
edge.f0;
-                                                       } i++;
-                                               }
-                                               return new Tuple2<Long, 
Long>(vertexId, minNeighorId);
-                                       }
-                               }, EdgeDirection.IN);
-                               
verticesWithLowestOutNeighbor.writeAsCsv(resultPath);
-                               env.execute();
-                               return "1,5\n" +
-                                               "2,1\n" + 
-                                               "3,1\n" +
-                                               "4,3\n" + 
-                                               "5,3\n";
-                       }
-                       case 6: {
-                               /*
-                                * Get the maximum weight among all edges
-                                * of a vertex
-                        */
-                               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-                               Graph<Long, Long, Long> graph = 
Graph.create(TestGraphUtils.getLongLongVertexData(env), 
-                                               
TestGraphUtils.getLongLongEdgeData(env), env);
-
-                               DataSet<Tuple2<Long, Long>> 
verticesWithMaxEdgeWeight = 
-                                               graph.reduceOnEdges(new 
EdgesFunction<Long, Long, Long>() {
-
-                                       public Tuple2<Long, Long> 
iterateEdges(Iterable<Tuple2<Long, Edge<Long, Long>>> edges) {
-                                               
-                                               long weight = Long.MIN_VALUE;
-                                               long vertexId = -1;
-                                               long i=0;
-
-                                               for (Tuple2<Long, Edge<Long, 
Long>> edge: edges) {
-                                                       if (edge.f1.getValue() 
> weight) {
-                                                               weight = 
edge.f1.getValue();
-                                                       }
-                                                       if (i==0) {
-                                                               vertexId = 
edge.f0;
-                                                       } i++;
-                                               }
-                                               return new Tuple2<Long, 
Long>(vertexId, weight);
-                                       }
-                               }, EdgeDirection.ALL);
-                               
verticesWithMaxEdgeWeight.writeAsCsv(resultPath);
-                               env.execute();
-                               return "1,51\n" +
-                                               "2,23\n" + 
-                                               "3,35\n" +
-                                               "4,45\n" + 
-                                               "5,51\n";
-                       }
-                       default: 
-                               throw new IllegalArgumentException("Invalid 
program id");
-                       }
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4b2c96d9/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestReduceOnEdgesMethods.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestReduceOnEdgesMethods.java
 
b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestReduceOnEdgesMethods.java
new file mode 100644
index 0000000..1fdccfa
--- /dev/null
+++ 
b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestReduceOnEdgesMethods.java
@@ -0,0 +1,290 @@
+package flink.graphs;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.LinkedList;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.test.util.JavaProgramTestBase;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+@RunWith(Parameterized.class)
+public class TestReduceOnEdgesMethods extends JavaProgramTestBase {
+
+       private static int NUM_PROGRAMS = 6;
+       
+       private int curProgId = config.getInteger("ProgramId", -1);
+       private String resultPath;
+       private String expectedResult;
+       
+       public TestReduceOnEdgesMethods(Configuration config) {
+               super(config);
+       }
+       
+       @Override
+       protected void preSubmit() throws Exception {
+               resultPath = getTempDirPath("result");
+       }
+
+       @Override
+       protected void testProgram() throws Exception {
+               expectedResult = GraphProgs.runProgram(curProgId, resultPath);
+       }
+       
+       @Override
+       protected void postSubmit() throws Exception {
+               compareResultsByLinesInMemory(expectedResult, resultPath);
+       }
+       
+       @Parameters
+       public static Collection<Object[]> getConfigurations() throws 
FileNotFoundException, IOException {
+
+               LinkedList<Configuration> tConfigs = new 
LinkedList<Configuration>();
+
+               for(int i=1; i <= NUM_PROGRAMS; i++) {
+                       Configuration config = new Configuration();
+                       config.setInteger("ProgramId", i);
+                       tConfigs.add(config);
+               }
+               
+               return toParameterList(tConfigs);
+       }
+       
+       private static class GraphProgs {
+       
+               @SuppressWarnings("serial")
+               public static String runProgram(int progId, String resultPath) 
throws Exception {
+                       
+                       switch(progId) {
+                       case 1: {
+                               /*
+                                * Get the lowest-weight out-neighbor
+                                * for each vertex
+                        */
+                               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+                               Graph<Long, Long, Long> graph = 
Graph.create(TestGraphUtils.getLongLongVertexData(env), 
+                                               
TestGraphUtils.getLongLongEdgeData(env), env);
+
+                               DataSet<Tuple2<Long, Long>> 
verticesWithLowestOutNeighbor = 
+                                               graph.reduceOnEdges(new 
EdgesFunctionWithVertexValue<Long, Long, Long, Long>() {
+
+                                       public Tuple2<Long, Long> iterateEdges(
+                                                       Vertex<Long, Long> v,
+                                                       Iterable<Edge<Long, 
Long>> edges) {
+                                               
+                                               long weight = Long.MAX_VALUE;
+                                               long minNeighorId = 0;
+                                               
+                                               for (Edge<Long, Long> edge: 
edges) {
+                                                       if (edge.getValue() < 
weight) {
+                                                               weight = 
edge.getValue();
+                                                               minNeighorId = 
edge.getTarget();
+                                                       }
+                                               }
+                                               return new Tuple2<Long, 
Long>(v.getId(), minNeighorId);
+                                       }
+                               }, EdgeDirection.OUT);
+                               
verticesWithLowestOutNeighbor.writeAsCsv(resultPath);
+                               env.execute();
+                               return "1,2\n" +
+                                               "2,3\n" + 
+                                               "3,4\n" +
+                                               "4,5\n" + 
+                                               "5,1\n";
+                       }
+                       case 2: {
+                               /*
+                                * Get the lowest-weight in-neighbor
+                                * for each vertex
+                        */
+                               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+                               Graph<Long, Long, Long> graph = 
Graph.create(TestGraphUtils.getLongLongVertexData(env), 
+                                               
TestGraphUtils.getLongLongEdgeData(env), env);
+
+                               DataSet<Tuple2<Long, Long>> 
verticesWithLowestOutNeighbor = 
+                                               graph.reduceOnEdges(new 
EdgesFunctionWithVertexValue<Long, Long, Long, Long>() {
+
+                                       public Tuple2<Long, Long> iterateEdges(
+                                                       Vertex<Long, Long> v,
+                                                       Iterable<Edge<Long, 
Long>> edges) {
+                                               
+                                               long weight = Long.MAX_VALUE;
+                                               long minNeighorId = 0;
+                                               
+                                               for (Edge<Long, Long> edge: 
edges) {
+                                                       if (edge.getValue() < 
weight) {
+                                                               weight = 
edge.getValue();
+                                                               minNeighorId = 
edge.getSource();
+                                                       }
+                                               }
+                                               return new Tuple2<Long, 
Long>(v.getId(), minNeighorId);
+                                       }
+                               }, EdgeDirection.IN);
+                               
verticesWithLowestOutNeighbor.writeAsCsv(resultPath);
+                               env.execute();
+                               return "1,5\n" +
+                                               "2,1\n" + 
+                                               "3,1\n" +
+                                               "4,3\n" + 
+                                               "5,3\n";
+                       }
+                       case 3: {
+                               /*
+                                * Get the maximum weight among all edges
+                                * of a vertex
+                        */
+                               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+                               Graph<Long, Long, Long> graph = 
Graph.create(TestGraphUtils.getLongLongVertexData(env), 
+                                               
TestGraphUtils.getLongLongEdgeData(env), env);
+
+                               DataSet<Tuple2<Long, Long>> 
verticesWithMaxEdgeWeight = 
+                                               graph.reduceOnEdges(new 
EdgesFunctionWithVertexValue<Long, Long, Long, Long>() {
+
+                                       public Tuple2<Long, Long> 
iterateEdges(Vertex<Long, Long> v,
+                                                       Iterable<Edge<Long, 
Long>> edges) {
+                                               
+                                               long weight = Long.MIN_VALUE;
+
+                                               for (Edge<Long, Long> edge: 
edges) {
+                                                       if (edge.getValue() > 
weight) {
+                                                               weight = 
edge.getValue();
+                                                       }
+                                               }
+                                               return new Tuple2<Long, 
Long>(v.getId(), weight);
+                                       }
+                               }, EdgeDirection.ALL);
+                               
verticesWithMaxEdgeWeight.writeAsCsv(resultPath);
+                               env.execute();
+                               return "1,51\n" +
+                                               "2,23\n" + 
+                                               "3,35\n" +
+                                               "4,45\n" + 
+                                               "5,51\n";
+                       }
+                       case 4: {
+                               /*
+                                * Get the lowest-weight out-neighbor
+                                * for each vertex
+                        */
+                               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+                               Graph<Long, Long, Long> graph = 
Graph.create(TestGraphUtils.getLongLongVertexData(env), 
+                                               
TestGraphUtils.getLongLongEdgeData(env), env);
+
+                               DataSet<Tuple2<Long, Long>> 
verticesWithLowestOutNeighbor = 
+                                               graph.reduceOnEdges(new 
EdgesFunction<Long, Long, Long>() {
+
+                                       public Tuple2<Long, Long> 
iterateEdges(Iterable<Tuple2<Long, Edge<Long, Long>>> edges) {
+
+                                               long weight = Long.MAX_VALUE;
+                                               long minNeighorId = 0;
+                                               long vertexId = -1;
+                                               long i=0;
+
+                                               for (Tuple2<Long, Edge<Long, 
Long>> edge: edges) {
+                                                       if (edge.f1.getValue() 
< weight) {
+                                                               weight = 
edge.f1.getValue();
+                                                               minNeighorId = 
edge.f1.getTarget();
+                                                       }
+                                                       if (i==0) {
+                                                               vertexId = 
edge.f0;
+                                                       } i++;
+                                               }
+                                               return new Tuple2<Long, 
Long>(vertexId, minNeighorId);
+                                       }
+                               }, EdgeDirection.OUT);
+                               
verticesWithLowestOutNeighbor.writeAsCsv(resultPath);
+                               env.execute();
+                               return "1,2\n" +
+                                               "2,3\n" + 
+                                               "3,4\n" +
+                                               "4,5\n" + 
+                                               "5,1\n";
+                       }
+                       case 5: {
+                               /*
+                                * Get the lowest-weight in-neighbor
+                                * for each vertex
+                        */
+                               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+                               Graph<Long, Long, Long> graph = 
Graph.create(TestGraphUtils.getLongLongVertexData(env), 
+                                               
TestGraphUtils.getLongLongEdgeData(env), env);
+
+                               DataSet<Tuple2<Long, Long>> 
verticesWithLowestOutNeighbor = 
+                                               graph.reduceOnEdges(new 
EdgesFunction<Long, Long, Long>() {
+
+                                       public Tuple2<Long, Long> 
iterateEdges(Iterable<Tuple2<Long, Edge<Long, Long>>> edges) {
+                                               
+                                               long weight = Long.MAX_VALUE;
+                                               long minNeighorId = 0;
+                                               long vertexId = -1;
+                                               long i=0;
+
+                                               for (Tuple2<Long, Edge<Long, 
Long>> edge: edges) {
+                                                       if (edge.f1.getValue() 
< weight) {
+                                                               weight = 
edge.f1.getValue();
+                                                               minNeighorId = 
edge.f1.getSource();
+                                                       }
+                                                       if (i==0) {
+                                                               vertexId = 
edge.f0;
+                                                       } i++;
+                                               }
+                                               return new Tuple2<Long, 
Long>(vertexId, minNeighorId);
+                                       }
+                               }, EdgeDirection.IN);
+                               
verticesWithLowestOutNeighbor.writeAsCsv(resultPath);
+                               env.execute();
+                               return "1,5\n" +
+                                               "2,1\n" + 
+                                               "3,1\n" +
+                                               "4,3\n" + 
+                                               "5,3\n";
+                       }
+                       case 6: {
+                               /*
+                                * Get the maximum weight among all edges
+                                * of a vertex
+                        */
+                               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+                               Graph<Long, Long, Long> graph = 
Graph.create(TestGraphUtils.getLongLongVertexData(env), 
+                                               
TestGraphUtils.getLongLongEdgeData(env), env);
+
+                               DataSet<Tuple2<Long, Long>> 
verticesWithMaxEdgeWeight = 
+                                               graph.reduceOnEdges(new 
EdgesFunction<Long, Long, Long>() {
+
+                                       public Tuple2<Long, Long> 
iterateEdges(Iterable<Tuple2<Long, Edge<Long, Long>>> edges) {
+                                               
+                                               long weight = Long.MIN_VALUE;
+                                               long vertexId = -1;
+                                               long i=0;
+
+                                               for (Tuple2<Long, Edge<Long, 
Long>> edge: edges) {
+                                                       if (edge.f1.getValue() 
> weight) {
+                                                               weight = 
edge.f1.getValue();
+                                                       }
+                                                       if (i==0) {
+                                                               vertexId = 
edge.f0;
+                                                       } i++;
+                                               }
+                                               return new Tuple2<Long, 
Long>(vertexId, weight);
+                                       }
+                               }, EdgeDirection.ALL);
+                               
verticesWithMaxEdgeWeight.writeAsCsv(resultPath);
+                               env.execute();
+                               return "1,51\n" +
+                                               "2,23\n" + 
+                                               "3,35\n" +
+                                               "4,45\n" + 
+                                               "5,51\n";
+                       }
+                       default: 
+                               throw new IllegalArgumentException("Invalid 
program id");
+                       }
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4b2c96d9/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestReduceOnNeighborMethods.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestReduceOnNeighborMethods.java
 
b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestReduceOnNeighborMethods.java
new file mode 100644
index 0000000..b8927aa
--- /dev/null
+++ 
b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestReduceOnNeighborMethods.java
@@ -0,0 +1,157 @@
+package flink.graphs;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.LinkedList;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.test.util.JavaProgramTestBase;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+@RunWith(Parameterized.class)
+public class TestReduceOnNeighborMethods extends JavaProgramTestBase {
+
+       private static int NUM_PROGRAMS = 3;
+       
+       private int curProgId = config.getInteger("ProgramId", -1);
+       private String resultPath;
+       private String expectedResult;
+       
+       public TestReduceOnNeighborMethods(Configuration config) {
+               super(config);
+       }
+       
+       @Override
+       protected void preSubmit() throws Exception {
+               resultPath = getTempDirPath("result");
+       }
+
+       @Override
+       protected void testProgram() throws Exception {
+               expectedResult = GraphProgs.runProgram(curProgId, resultPath);
+       }
+       
+       @Override
+       protected void postSubmit() throws Exception {
+               compareResultsByLinesInMemory(expectedResult, resultPath);
+       }
+       
+       @Parameters
+       public static Collection<Object[]> getConfigurations() throws 
FileNotFoundException, IOException {
+
+               LinkedList<Configuration> tConfigs = new 
LinkedList<Configuration>();
+
+               for(int i=1; i <= NUM_PROGRAMS; i++) {
+                       Configuration config = new Configuration();
+                       config.setInteger("ProgramId", i);
+                       tConfigs.add(config);
+               }
+               
+               return toParameterList(tConfigs);
+       }
+       
+       private static class GraphProgs {
+       
+               @SuppressWarnings("serial")
+               public static String runProgram(int progId, String resultPath) 
throws Exception {
+                       
+                       switch(progId) {
+                       case 1: {
+                               /*
+                                * Get the sum of out-neighbor values
+                                * for each vertex
+                        */
+                               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+                               Graph<Long, Long, Long> graph = 
Graph.create(TestGraphUtils.getLongLongVertexData(env), 
+                                               
TestGraphUtils.getLongLongEdgeData(env), env);
+
+                               DataSet<Tuple2<Long, Long>> 
verticesWithSumOfOutNeighborValues = 
+                                               graph.reduceOnNeighbors(new 
NeighborsFunctionWithVertexValue<Long, Long, Long, Long>() {
+                                                       public Tuple2<Long, 
Long> iterateNeighbors(Vertex<Long, Long> vertex,
+                                                                       
Iterable<Tuple2<Edge<Long, Long>, Vertex<Long, Long>>> neighbors) {
+                                                               long sum = 0;
+                                                               for 
(Tuple2<Edge<Long, Long>, Vertex<Long, Long>> neighbor : neighbors) {
+                                                                       sum += 
neighbor.f1.getValue();
+                                                               }
+                                                               return new 
Tuple2<Long, Long>(vertex.getId(), sum);
+                                                       }
+                                               }, EdgeDirection.OUT);
+
+                               
verticesWithSumOfOutNeighborValues.writeAsCsv(resultPath);
+                               env.execute();
+                               return "1,5\n" +
+                                               "2,3\n" + 
+                                               "3,9\n" +
+                                               "4,5\n" + 
+                                               "5,1\n";
+                       }
+                       case 2: {
+                               /*
+                                * Get the sum of in-neighbor values
+                                * time edge weights for each vertex
+                        */
+                               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+                               Graph<Long, Long, Long> graph = 
Graph.create(TestGraphUtils.getLongLongVertexData(env), 
+                                               
TestGraphUtils.getLongLongEdgeData(env), env);
+
+                               DataSet<Tuple2<Long, Long>> verticesWithSum = 
+                                               graph.reduceOnNeighbors(new 
NeighborsFunctionWithVertexValue<Long, Long, Long, Long>() {
+                                                       public Tuple2<Long, 
Long> iterateNeighbors(Vertex<Long, Long> vertex,
+                                                                       
Iterable<Tuple2<Edge<Long, Long>, Vertex<Long, Long>>> neighbors) {
+                                                               long sum = 0;
+                                                               for 
(Tuple2<Edge<Long, Long>, Vertex<Long, Long>> neighbor : neighbors) {
+                                                                       sum += 
neighbor.f0.getValue() * neighbor.f1.getValue();
+                                                               }
+                                                               return new 
Tuple2<Long, Long>(vertex.getId(), sum);
+                                                       }
+                                               }, EdgeDirection.IN);           
+
+                               verticesWithSum.writeAsCsv(resultPath);
+                               env.execute();
+                               return "1,255\n" +
+                                               "2,12\n" + 
+                                               "3,59\n" +
+                                               "4,102\n" + 
+                                               "5,285\n";
+                       }
+                       case 3: {
+                               /*
+                                * Get the sum of all neighbor values
+                                * for each vertex
+                        */
+                               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+                               Graph<Long, Long, Long> graph = 
Graph.create(TestGraphUtils.getLongLongVertexData(env), 
+                                               
TestGraphUtils.getLongLongEdgeData(env), env);
+
+                               DataSet<Tuple2<Long, Long>> 
verticesWithSumOfOutNeighborValues = 
+                                               graph.reduceOnNeighbors(new 
NeighborsFunctionWithVertexValue<Long, Long, Long, Long>() {
+                                                       public Tuple2<Long, 
Long> iterateNeighbors(Vertex<Long, Long> vertex,
+                                                                       
Iterable<Tuple2<Edge<Long, Long>, Vertex<Long, Long>>> neighbors) {
+                                                               long sum = 0;
+                                                               for 
(Tuple2<Edge<Long, Long>, Vertex<Long, Long>> neighbor : neighbors) {
+                                                                       sum += 
neighbor.f1.getValue();
+                                                               }
+                                                               return new 
Tuple2<Long, Long>(vertex.getId(), sum);
+                                                       }
+                                               }, EdgeDirection.ALL);
+
+                               
verticesWithSumOfOutNeighborValues.writeAsCsv(resultPath);
+                               env.execute();
+                               return "1,10\n" +
+                                               "2,4\n" + 
+                                               "3,12\n" +
+                                               "4,8\n" + 
+                                               "5,8\n";
+                       }
+                       default: 
+                               throw new IllegalArgumentException("Invalid 
program id");
+                       }
+               }
+       }
+}
\ No newline at end of file

Reply via email to