http://git-wip-us.apache.org/repos/asf/flink/blob/6296c394/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/ReduceOnEdgesMethodsITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/ReduceOnEdgesMethodsITCase.java
 
b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/ReduceOnEdgesMethodsITCase.java
new file mode 100644
index 0000000..29d76f0
--- /dev/null
+++ 
b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/ReduceOnEdgesMethodsITCase.java
@@ -0,0 +1,317 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.test;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.EdgeDirection;
+import org.apache.flink.graph.EdgesFunction;
+import org.apache.flink.graph.EdgesFunctionWithVertexValue;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class ReduceOnEdgesMethodsITCase extends MultipleProgramsTestBase {
+
+       public 
ReduceOnEdgesMethodsITCase(MultipleProgramsTestBase.ExecutionMode mode){
+               super(mode);
+       }
+
+    private String resultPath;
+    private String expectedResult;
+
+    @Rule
+       public TemporaryFolder tempFolder = new TemporaryFolder();
+
+       @Before
+       public void before() throws Exception{
+               resultPath = tempFolder.newFile().toURI().toString();
+       }
+
+       @After
+       public void after() throws Exception{
+               compareResultsByLinesInMemory(expectedResult, resultPath);
+       }
+
+       @Test
+       public void testLowestWeightOutNeighbor() throws Exception {
+               /*
+                * Get the lowest-weight out-neighbor
+                * for each vertex
+         */
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               Graph<Long, Long, Long> graph = 
Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), 
+                               TestGraphUtils.getLongLongEdgeData(env), env);
+
+               DataSet<Tuple2<Long, Long>> verticesWithLowestOutNeighbor = 
+                               graph.reduceOnEdges(new 
SelectMinWeightNeighbor(), EdgeDirection.OUT);
+               verticesWithLowestOutNeighbor.writeAsCsv(resultPath);
+               env.execute();
+       
+               expectedResult = "1,2\n" +
+                               "2,3\n" + 
+                               "3,4\n" +
+                               "4,5\n" + 
+                               "5,1\n";
+       }
+
+       @Test
+       public void testLowestWeightInNeighbor() throws Exception {
+               /*
+                * Get the lowest-weight in-neighbor
+                * for each vertex
+         */
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               Graph<Long, Long, Long> graph = 
Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), 
+                               TestGraphUtils.getLongLongEdgeData(env), env);
+
+               DataSet<Tuple2<Long, Long>> verticesWithLowestOutNeighbor = 
+                               graph.reduceOnEdges(new 
SelectMinWeightInNeighbor(), EdgeDirection.IN);
+               verticesWithLowestOutNeighbor.writeAsCsv(resultPath);
+               env.execute();
+
+               expectedResult = "1,5\n" +
+                                       "2,1\n" + 
+                                       "3,1\n" +
+                                       "4,3\n" + 
+                                       "5,3\n";
+       }
+
+       @Test
+       public void testMaxWeightEdge() throws Exception {
+               /*
+                * Get the maximum weight among all edges
+                * of a vertex
+         */
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               Graph<Long, Long, Long> graph = 
Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), 
+                               TestGraphUtils.getLongLongEdgeData(env), env);
+
+               DataSet<Tuple2<Long, Long>> verticesWithMaxEdgeWeight = 
+                               graph.reduceOnEdges(new 
SelectMaxWeightNeighbor(), EdgeDirection.ALL);
+               verticesWithMaxEdgeWeight.writeAsCsv(resultPath);
+               env.execute();
+
+               expectedResult = "1,51\n" +
+                               "2,23\n" + 
+                               "3,35\n" +
+                               "4,45\n" + 
+                               "5,51\n";
+       }
+
+       @Test
+       public void testLowestWeightOutNeighborNoValue() throws Exception {
+               /*
+                * Get the lowest-weight out-neighbor
+                * for each vertex
+         */
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               Graph<Long, Long, Long> graph = 
Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), 
+                               TestGraphUtils.getLongLongEdgeData(env), env);
+
+               DataSet<Tuple2<Long, Long>> verticesWithLowestOutNeighbor = 
+                               graph.reduceOnEdges(new 
SelectMinWeightNeighborNoValue(), EdgeDirection.OUT);
+               verticesWithLowestOutNeighbor.writeAsCsv(resultPath);
+               env.execute();
+
+               expectedResult = "1,2\n" +
+                               "2,3\n" + 
+                               "3,4\n" +
+                               "4,5\n" + 
+                               "5,1\n";
+       }
+
+       @Test
+       public void testLowestWeightInNeighborNoValue() throws Exception {
+               /*
+                * Get the lowest-weight in-neighbor
+                * for each vertex
+         */
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               Graph<Long, Long, Long> graph = 
Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), 
+                               TestGraphUtils.getLongLongEdgeData(env), env);
+
+               DataSet<Tuple2<Long, Long>> verticesWithLowestOutNeighbor = 
+                               graph.reduceOnEdges(new 
SelectMinWeightInNeighborNoValue(), EdgeDirection.IN);
+               verticesWithLowestOutNeighbor.writeAsCsv(resultPath);
+               env.execute();
+
+               expectedResult = "1,5\n" +
+                               "2,1\n" + 
+                               "3,1\n" +
+                               "4,3\n" + 
+                               "5,3\n";
+       }
+
+       @Test
+       public void testMaxWeightAllNeighbors() throws Exception {
+               /*
+                * Get the maximum weight among all edges
+                * of a vertex
+         */
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               Graph<Long, Long, Long> graph = 
Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), 
+                               TestGraphUtils.getLongLongEdgeData(env), env);
+
+               DataSet<Tuple2<Long, Long>> verticesWithMaxEdgeWeight = 
+                               graph.reduceOnEdges(new 
SelectMaxWeightNeighborNoValue(), EdgeDirection.ALL);
+               verticesWithMaxEdgeWeight.writeAsCsv(resultPath);
+               env.execute();
+
+               expectedResult = "1,51\n" +
+                               "2,23\n" + 
+                               "3,35\n" +
+                               "4,45\n" + 
+                               "5,51\n";
+       }
+
+       @SuppressWarnings("serial")
+       private static final class SelectMinWeightNeighbor implements 
EdgesFunctionWithVertexValue<Long, Long, Long, Tuple2<Long, Long>> {
+
+               public Tuple2<Long, Long> iterateEdges(
+                               Vertex<Long, Long> v,
+                               Iterable<Edge<Long, Long>> edges) {
+                       
+                       long weight = Long.MAX_VALUE;
+                       long minNeighorId = 0;
+                       
+                       for (Edge<Long, Long> edge: edges) {
+                               if (edge.getValue() < weight) {
+                                       weight = edge.getValue();
+                                       minNeighorId = edge.getTarget();
+                               }
+                       }
+                       return new Tuple2<Long, Long>(v.getId(), minNeighorId);
+               }
+       }
+
+       @SuppressWarnings("serial")
+       private static final class SelectMaxWeightNeighbor implements 
EdgesFunctionWithVertexValue<Long, Long, Long, Tuple2<Long, Long>> {
+
+               public Tuple2<Long, Long> iterateEdges(Vertex<Long, Long> v,
+                               Iterable<Edge<Long, Long>> edges) {
+                       
+                       long weight = Long.MIN_VALUE;
+
+                       for (Edge<Long, Long> edge: edges) {
+                               if (edge.getValue() > weight) {
+                                       weight = edge.getValue();
+                               }
+                       }
+                       return new Tuple2<Long, Long>(v.getId(), weight);
+               }
+       }
+
+       @SuppressWarnings("serial")
+       private static final class SelectMinWeightNeighborNoValue implements 
EdgesFunction<Long, Long, Tuple2<Long, Long>> {
+
+               public Tuple2<Long, Long> iterateEdges(Iterable<Tuple2<Long, 
Edge<Long, Long>>> edges) {
+
+                       long weight = Long.MAX_VALUE;
+                       long minNeighorId = 0;
+                       long vertexId = -1;
+                       long i=0;
+
+                       for (Tuple2<Long, Edge<Long, Long>> edge: edges) {
+                               if (edge.f1.getValue() < weight) {
+                                       weight = edge.f1.getValue();
+                                       minNeighorId = edge.f1.getTarget();
+                               }
+                               if (i==0) {
+                                       vertexId = edge.f0;
+                               } i++;
+                       }
+                       return new Tuple2<Long, Long>(vertexId, minNeighorId);
+               }
+       }
+
+       @SuppressWarnings("serial")
+       private static final class SelectMaxWeightNeighborNoValue implements 
EdgesFunction<Long, Long, Tuple2<Long, Long>> {
+
+               public Tuple2<Long, Long> iterateEdges(Iterable<Tuple2<Long, 
Edge<Long, Long>>> edges) {
+                       
+                       long weight = Long.MIN_VALUE;
+                       long vertexId = -1;
+                       long i=0;
+
+                       for (Tuple2<Long, Edge<Long, Long>> edge: edges) {
+                               if (edge.f1.getValue() > weight) {
+                                       weight = edge.f1.getValue();
+                               }
+                               if (i==0) {
+                                       vertexId = edge.f0;
+                               } i++;
+                       }
+                       return new Tuple2<Long, Long>(vertexId, weight);
+               }
+       }
+
+       @SuppressWarnings("serial")
+       private static final class SelectMinWeightInNeighbor implements 
EdgesFunctionWithVertexValue<Long, Long, Long, Tuple2<Long, Long>> {
+
+               public Tuple2<Long, Long> iterateEdges(
+                               Vertex<Long, Long> v,
+                               Iterable<Edge<Long, Long>> edges) {
+                       
+                       long weight = Long.MAX_VALUE;
+                       long minNeighorId = 0;
+                       
+                       for (Edge<Long, Long> edge: edges) {
+                               if (edge.getValue() < weight) {
+                                       weight = edge.getValue();
+                                       minNeighorId = edge.getSource();
+                               }
+                       }
+                       return new Tuple2<Long, Long>(v.getId(), minNeighorId);
+               }
+       }
+
+       @SuppressWarnings("serial")
+       private static final class SelectMinWeightInNeighborNoValue implements 
EdgesFunction<Long, Long, Tuple2<Long, Long>> {
+
+               public Tuple2<Long, Long> iterateEdges(Iterable<Tuple2<Long, 
Edge<Long, Long>>> edges) {
+                       
+                       long weight = Long.MAX_VALUE;
+                       long minNeighorId = 0;
+                       long vertexId = -1;
+                       long i=0;
+
+                       for (Tuple2<Long, Edge<Long, Long>> edge: edges) {
+                               if (edge.f1.getValue() < weight) {
+                                       weight = edge.f1.getValue();
+                                       minNeighorId = edge.f1.getSource();
+                               }
+                               if (i==0) {
+                                       vertexId = edge.f0;
+                               } i++;
+                       }
+                       return new Tuple2<Long, Long>(vertexId, minNeighorId);
+               }
+       }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/6296c394/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/ReduceOnNeighborMethodsITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/ReduceOnNeighborMethodsITCase.java
 
b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/ReduceOnNeighborMethodsITCase.java
new file mode 100644
index 0000000..d385399
--- /dev/null
+++ 
b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/ReduceOnNeighborMethodsITCase.java
@@ -0,0 +1,303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.test;
+
+import java.util.Iterator;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.EdgeDirection;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.NeighborsFunction;
+import org.apache.flink.graph.NeighborsFunctionWithVertexValue;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class ReduceOnNeighborMethodsITCase extends MultipleProgramsTestBase {
+
+       public 
ReduceOnNeighborMethodsITCase(MultipleProgramsTestBase.ExecutionMode mode){
+               super(mode);
+       }
+
+    private String resultPath;
+    private String expectedResult;
+
+    @Rule
+       public TemporaryFolder tempFolder = new TemporaryFolder();
+
+       @Before
+       public void before() throws Exception{
+               resultPath = tempFolder.newFile().toURI().toString();
+       }
+
+       @After
+       public void after() throws Exception{
+               compareResultsByLinesInMemory(expectedResult, resultPath);
+       }
+
+       @Test
+       public void testSumOfOutNeighbors() throws Exception {
+               /*
+                * Get the sum of out-neighbor values
+                * for each vertex
+         */
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               Graph<Long, Long, Long> graph = 
Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), 
+                               TestGraphUtils.getLongLongEdgeData(env), env);
+
+               DataSet<Tuple2<Long, Long>> verticesWithSumOfOutNeighborValues 
= 
+                               graph.reduceOnNeighbors(new SumOutNeighbors(), 
EdgeDirection.OUT);
+
+               verticesWithSumOfOutNeighborValues.writeAsCsv(resultPath);
+               env.execute();
+               expectedResult = "1,5\n" +
+                               "2,3\n" + 
+                               "3,9\n" +
+                               "4,5\n" + 
+                               "5,1\n";
+       }
+
+       @Test
+       public void testSumOfInNeighbors() throws Exception {
+               /*
+                * Get the sum of in-neighbor values
+                * times the edge weights for each vertex
+         */
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               Graph<Long, Long, Long> graph = 
Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), 
+                               TestGraphUtils.getLongLongEdgeData(env), env);
+
+               DataSet<Tuple2<Long, Long>> verticesWithSum = 
+                               graph.reduceOnNeighbors(new SumInNeighbors(), 
EdgeDirection.IN);                
+
+               verticesWithSum.writeAsCsv(resultPath);
+               env.execute();
+               expectedResult = "1,255\n" +
+                               "2,12\n" + 
+                               "3,59\n" +
+                               "4,102\n" + 
+                               "5,285\n";
+       }
+
+       @Test
+       public void testSumOfOAllNeighbors() throws Exception {
+               /*
+                * Get the sum of all neighbor values
+                * including own vertex value
+                * for each vertex
+         */
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               Graph<Long, Long, Long> graph = 
Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), 
+                               TestGraphUtils.getLongLongEdgeData(env), env);
+
+               DataSet<Tuple2<Long, Long>> verticesWithSumOfOutNeighborValues 
= 
+                               graph.reduceOnNeighbors(new SumAllNeighbors(), 
EdgeDirection.ALL);
+
+               verticesWithSumOfOutNeighborValues.writeAsCsv(resultPath);
+               env.execute();
+
+               expectedResult = "1,11\n" +
+                               "2,6\n" + 
+                               "3,15\n" +
+                               "4,12\n" + 
+                               "5,13\n";
+       }
+
+       @Test
+       public void testSumOfOutNeighborsNoValue() throws Exception {
+               /*
+                * Get the sum of out-neighbor values
+                * for each vertex
+         */
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               Graph<Long, Long, Long> graph = 
Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), 
+                               TestGraphUtils.getLongLongEdgeData(env), env);
+
+               DataSet<Tuple2<Long, Long>> verticesWithSumOfOutNeighborValues 
= 
+                               graph.reduceOnNeighbors(new 
SumOutNeighborsNoValue(), EdgeDirection.OUT);
+
+               verticesWithSumOfOutNeighborValues.writeAsCsv(resultPath);
+               env.execute();
+
+               expectedResult = "1,5\n" +
+                               "2,3\n" + 
+                               "3,9\n" +
+                               "4,5\n" + 
+                               "5,1\n";
+       }
+
+       @Test
+       public void testSumOfInNeighborsNoValue() throws Exception {
+               /*
+                * Get the sum of in-neighbor values
+                * times the edge weights for each vertex
+         */
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               Graph<Long, Long, Long> graph = 
Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), 
+                               TestGraphUtils.getLongLongEdgeData(env), env);
+
+               DataSet<Tuple2<Long, Long>> verticesWithSum = 
+                               graph.reduceOnNeighbors(new 
SumInNeighborsNoValue(), EdgeDirection.IN);
+
+               verticesWithSum.writeAsCsv(resultPath);
+               env.execute();
+       
+               expectedResult = "1,255\n" +
+                               "2,12\n" + 
+                               "3,59\n" +
+                               "4,102\n" + 
+                               "5,285\n";
+       }
+
+       @Test
+       public void testSumOfAllNeighborsNoValue() throws Exception {
+               /*
+                * Get the sum of all neighbor values
+                * for each vertex
+         */
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               Graph<Long, Long, Long> graph = 
Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), 
+                               TestGraphUtils.getLongLongEdgeData(env), env);
+
+               DataSet<Tuple2<Long, Long>> verticesWithSumOfOutNeighborValues 
= 
+                               graph.reduceOnNeighbors(new 
SumAllNeighborsNoValue(), EdgeDirection.ALL);
+
+               verticesWithSumOfOutNeighborValues.writeAsCsv(resultPath);
+               env.execute();
+       
+               expectedResult = "1,10\n" +
+                               "2,4\n" + 
+                               "3,12\n" +
+                               "4,8\n" + 
+                               "5,8\n";
+       }
+
+       @SuppressWarnings("serial")
+       private static final class SumOutNeighbors implements 
NeighborsFunctionWithVertexValue<Long, Long, Long, 
+       Tuple2<Long, Long>> {
+
+               public Tuple2<Long, Long> iterateNeighbors(Vertex<Long, Long> 
vertex,
+                               Iterable<Tuple2<Edge<Long, Long>, Vertex<Long, 
Long>>> neighbors) {
+                       
+                       long sum = 0;
+                       for (Tuple2<Edge<Long, Long>, Vertex<Long, Long>> 
neighbor : neighbors) {
+                               sum += neighbor.f1.getValue();
+                       }
+                       return new Tuple2<Long, Long>(vertex.getId(), sum);
+               }
+       }
+
+       @SuppressWarnings("serial")
+       private static final class SumInNeighbors implements 
NeighborsFunctionWithVertexValue<Long, Long, Long, 
+               Tuple2<Long, Long>> {
+               
+               public Tuple2<Long, Long> iterateNeighbors(Vertex<Long, Long> 
vertex,
+                               Iterable<Tuple2<Edge<Long, Long>, Vertex<Long, 
Long>>> neighbors) {
+               
+                       long sum = 0;
+                       for (Tuple2<Edge<Long, Long>, Vertex<Long, Long>> 
neighbor : neighbors) {
+                               sum += neighbor.f0.getValue() * 
neighbor.f1.getValue();
+                       }
+                       return new Tuple2<Long, Long>(vertex.getId(), sum);
+               }
+       }
+
+       @SuppressWarnings("serial")
+       private static final class SumAllNeighbors implements 
NeighborsFunctionWithVertexValue<Long, Long, Long, 
+               Tuple2<Long, Long>> {
+
+               public Tuple2<Long, Long> iterateNeighbors(Vertex<Long, Long> 
vertex,
+               Iterable<Tuple2<Edge<Long, Long>, Vertex<Long, Long>>> 
neighbors) {
+       
+                       long sum = 0;
+                       for (Tuple2<Edge<Long, Long>, Vertex<Long, Long>> 
neighbor : neighbors) {
+                               sum += neighbor.f1.getValue();
+                       }
+                       return new Tuple2<Long, Long>(vertex.getId(), sum + 
vertex.getValue());
+               }
+       }
+
+       @SuppressWarnings("serial")
+       private static final class SumOutNeighborsNoValue implements 
NeighborsFunction<Long, Long, Long, 
+               Tuple2<Long, Long>> {
+
+               public Tuple2<Long, Long> iterateNeighbors(
+                               Iterable<Tuple3<Long, Edge<Long, Long>, 
Vertex<Long, Long>>> neighbors) {
+
+                       long sum = 0;
+                       Tuple3<Long, Edge<Long, Long>, Vertex<Long, Long>> next 
= null;
+                       Iterator<Tuple3<Long, Edge<Long, Long>, Vertex<Long, 
Long>>> neighborsIterator =
+                                       neighbors.iterator();
+                       while(neighborsIterator.hasNext()) {
+                               next = neighborsIterator.next();
+                               sum += next.f2.getValue();
+                       }
+                       return new Tuple2<Long, Long>(next.f0, sum);
+               }
+       }
+
+       @SuppressWarnings("serial")
+       private static final class SumInNeighborsNoValue implements 
NeighborsFunction<Long, Long, Long, 
+               Tuple2<Long, Long>> {
+               
+               public Tuple2<Long, Long> iterateNeighbors(
+                               Iterable<Tuple3<Long, Edge<Long, Long>, 
Vertex<Long, Long>>> neighbors) {
+               
+                       long sum = 0;
+                       Tuple3<Long, Edge<Long, Long>, Vertex<Long, Long>> next 
= null;
+                       Iterator<Tuple3<Long, Edge<Long, Long>, Vertex<Long, 
Long>>> neighborsIterator =
+                                       neighbors.iterator();
+                       while(neighborsIterator.hasNext()) {
+                               next = neighborsIterator.next();
+                               sum += next.f2.getValue() * next.f1.getValue();
+                       }
+                       return new Tuple2<Long, Long>(next.f0, sum);
+               }
+       }
+
+       @SuppressWarnings("serial")
+       private static final class SumAllNeighborsNoValue implements 
NeighborsFunction<Long, Long, Long, 
+               Tuple2<Long, Long>> {
+
+               public Tuple2<Long, Long> iterateNeighbors(
+                               Iterable<Tuple3<Long, Edge<Long, Long>, 
Vertex<Long, Long>>> neighbors) {
+       
+                       long sum = 0;
+                       Tuple3<Long, Edge<Long, Long>, Vertex<Long, Long>> next 
= null;
+                       Iterator<Tuple3<Long, Edge<Long, Long>, Vertex<Long, 
Long>>> neighborsIterator =
+                                       neighbors.iterator();
+                       while(neighborsIterator.hasNext()) {
+                               next = neighborsIterator.next();
+                               sum += next.f2.getValue();
+                       }
+                       return new Tuple2<Long, Long>(next.f0, sum);
+               }
+       }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/6296c394/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestDegrees.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestDegrees.java
 
b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestDegrees.java
deleted file mode 100644
index c572647..0000000
--- 
a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestDegrees.java
+++ /dev/null
@@ -1,171 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.test;
-
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.graph.Graph;
-import org.apache.flink.test.util.MultipleProgramsTestBase;
-import org.apache.flink.types.NullValue;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-@RunWith(Parameterized.class)
-public class TestDegrees extends MultipleProgramsTestBase {
-
-       public TestDegrees(MultipleProgramsTestBase.ExecutionMode mode){
-               super(mode);
-       }
-
-    private String resultPath;
-    private String expectedResult;
-
-    @Rule
-       public TemporaryFolder tempFolder = new TemporaryFolder();
-
-       @Before
-       public void before() throws Exception{
-               resultPath = tempFolder.newFile().toURI().toString();
-       }
-
-       @After
-       public void after() throws Exception{
-               compareResultsByLinesInMemory(expectedResult, resultPath);
-       }
-
-       @Test
-       public void testOutDegrees() throws Exception {
-               /*
-               * Test outDegrees()
-               */
-        final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-
-        Graph<Long, Long, Long> graph = 
Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
-                TestGraphUtils.getLongLongEdgeData(env), env);
-
-        graph.outDegrees().writeAsCsv(resultPath);
-        env.execute();
-
-        expectedResult = "1,2\n" +
-                    "2,1\n" +
-                    "3,2\n" +
-                    "4,1\n" +
-                    "5,1\n";
-    }
-
-       @Test
-       public void testOutDegreesWithNoOutEdges() throws Exception {
-               /*
-                * Test outDegrees() no outgoing edges
-                */
-        final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-
-        Graph<Long, Long, Long> graph = 
Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
-                TestGraphUtils.getLongLongEdgeDataWithZeroDegree(env), env);
-
-        graph.outDegrees().writeAsCsv(resultPath);
-        env.execute();
-
-        expectedResult = "1,3\n" +
-                "2,1\n" +
-                "3,1\n" +
-                "4,1\n" +
-                "5,0\n";
-    }
-
-       @Test
-       public void testInDegrees() throws Exception {
-               /*
-                * Test inDegrees()
-                */
-           final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-
-           Graph<Long, Long, Long> graph = 
Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
-                   TestGraphUtils.getLongLongEdgeData(env), env);
-
-           graph.inDegrees().writeAsCsv(resultPath);
-           env.execute();
-           expectedResult = "1,1\n" +
-                           "2,1\n" +
-                           "3,2\n" +
-                           "4,1\n" +
-                           "5,2\n";
-    }
-
-       @Test
-       public void testInDegreesWithNoInEdge() throws Exception {
-               /*
-                * Test inDegrees() no ingoing edge
-                */
-        final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-
-        Graph<Long, Long, Long> graph = 
Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
-                TestGraphUtils.getLongLongEdgeDataWithZeroDegree(env), env);
-
-        graph.inDegrees().writeAsCsv(resultPath);
-        env.execute();
-        expectedResult = "1,0\n" +
-                       "2,1\n" +
-                       "3,1\n" +
-                       "4,1\n" +
-                       "5,3\n";
-    }
-
-       @Test
-       public void testGetDegrees() throws Exception {
-               /*
-                * Test getDegrees()
-                */
-        final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-
-        Graph<Long, Long, Long> graph = 
Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
-                TestGraphUtils.getLongLongEdgeData(env), env);
-
-        graph.getDegrees().writeAsCsv(resultPath);
-        env.execute();
-        expectedResult = "1,3\n" +
-                       "2,2\n" +
-                       "3,4\n" +
-                       "4,2\n" +
-                       "5,3\n";
-    }
-
-       @Test
-       public void testGetDegreesWithDisconnectedData() throws Exception {
-        /*
-                * Test getDegrees() with disconnected data
-                */
-        final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-
-        Graph<Long, NullValue, Long> graph =
-                
Graph.fromDataSet(TestGraphUtils.getDisconnectedLongLongEdgeData(env), env);
-
-        graph.outDegrees().writeAsCsv(resultPath);
-        env.execute();
-        expectedResult = "1,2\n" +
-                "2,1\n" +
-                "3,0\n" +
-                "4,1\n" +
-                "5,0\n";
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/6296c394/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestFromCollection.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestFromCollection.java
 
b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestFromCollection.java
deleted file mode 100644
index e1b96e8..0000000
--- 
a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestFromCollection.java
+++ /dev/null
@@ -1,120 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.test;
-
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.graph.Graph;
-import org.apache.flink.test.util.MultipleProgramsTestBase;
-import org.apache.flink.types.NullValue;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-@RunWith(Parameterized.class)
-public class TestFromCollection extends MultipleProgramsTestBase {
-
-       public TestFromCollection(MultipleProgramsTestBase.ExecutionMode mode){
-               super(mode);
-       }
-
-    private String resultPath;
-    private String expectedResult;
-
-    @Rule
-       public TemporaryFolder tempFolder = new TemporaryFolder();
-
-       @Before
-       public void before() throws Exception{
-               resultPath = tempFolder.newFile().toURI().toString();
-       }
-
-       @After
-       public void after() throws Exception{
-               compareResultsByLinesInMemory(expectedResult, resultPath);
-       }
-
-       @Test
-       public void testFromCollectionVerticesEdges() throws Exception {
-               /*
-                * Test fromCollection(vertices, edges):
-                */
-        final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-        Graph<Long, Long, Long> graph = 
Graph.fromCollection(TestGraphUtils.getLongLongVertices(),
-                TestGraphUtils.getLongLongEdges(), env);
-
-        graph.getEdges().writeAsCsv(resultPath);
-        env.execute();
-        expectedResult = "1,2,12\n" +
-                       "1,3,13\n" +
-                       "2,3,23\n" +
-                       "3,4,34\n" +
-                       "3,5,35\n" +
-                       "4,5,45\n" +
-                       "5,1,51\n";
-    }
-
-       @Test
-       public void testFromCollectionEdgesNoInitialValue() throws Exception {
-        /*
-         * Test fromCollection(edges) with no initial value for the vertices
-         */
-        final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-        Graph<Long, NullValue, Long> graph = 
Graph.fromCollection(TestGraphUtils.getLongLongEdges(),
-                       env);
-
-        graph.getVertices().writeAsCsv(resultPath);
-        env.execute();
-        expectedResult = "1,(null)\n" +
-                       "2,(null)\n" +
-                       "3,(null)\n" +
-                       "4,(null)\n" +
-                       "5,(null)\n";
-    }
-
-       @Test
-       public void testFromCollectionEdgesWithInitialValue() throws Exception {
-        /*
-         * Test fromCollection(edges) with vertices initialised by a
-         * function that takes the id and doubles it
-         */
-        final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-               Graph<Long, Long, Long> graph = 
Graph.fromCollection(TestGraphUtils.getLongLongEdges(),
-                new InitVerticesMapper(), env);
-
-        graph.getVertices().writeAsCsv(resultPath);
-        env.execute();
-        expectedResult = "1,2\n" +
-                       "2,4\n" +
-                       "3,6\n" +
-                       "4,8\n" +
-                       "5,10\n";
-    }
-
-       @SuppressWarnings("serial")
-       private static final class InitVerticesMapper implements 
MapFunction<Long, Long> {
-        public Long map(Long vertexId) {
-            return vertexId * 2;
-        }
-       }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/6296c394/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestGraphCreation.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestGraphCreation.java
 
b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestGraphCreation.java
deleted file mode 100644
index d3e3bda..0000000
--- 
a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestGraphCreation.java
+++ /dev/null
@@ -1,170 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.test;
-
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.graph.Edge;
-import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.Vertex;
-import org.apache.flink.graph.test.TestGraphUtils.DummyCustomParameterizedType;
-import org.apache.flink.graph.validation.InvalidVertexIdsValidator;
-import org.apache.flink.test.util.MultipleProgramsTestBase;
-import org.apache.flink.types.NullValue;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-@RunWith(Parameterized.class)
-public class TestGraphCreation extends MultipleProgramsTestBase {
-
-       public TestGraphCreation(MultipleProgramsTestBase.ExecutionMode mode){
-               super(mode);
-       }
-
-    private String resultPath;
-    private String expectedResult;
-
-    @Rule
-       public TemporaryFolder tempFolder = new TemporaryFolder();
-
-       @Before
-       public void before() throws Exception{
-               resultPath = tempFolder.newFile().toURI().toString();
-       }
-
-       @After
-       public void after() throws Exception{
-               compareResultsByLinesInMemory(expectedResult, resultPath);
-       }
-
-       @Test
-       public void testCreateWithoutVertexValues() throws Exception {
-       /*
-        * Test create() with edge dataset and no vertex values
-     */
-               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-               Graph<Long, NullValue, Long> graph = 
Graph.fromDataSet(TestGraphUtils.getLongLongEdgeData(env), env);
-
-               graph.getVertices().writeAsCsv(resultPath);
-               env.execute();
-               expectedResult = "1,(null)\n" +
-                                       "2,(null)\n" +
-                                       "3,(null)\n" +
-                                       "4,(null)\n" +
-                                       "5,(null)\n";
-       }
-
-       @Test
-       public void testCreateWithMapper() throws Exception {
-       /*
-        * Test create() with edge dataset and a mapper that assigns the id as 
value
-     */
-               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-               Graph<Long, Long, Long> graph = 
Graph.fromDataSet(TestGraphUtils.getLongLongEdgeData(env),
-                               new AssignIdAsValueMapper(), env);
-
-               graph.getVertices().writeAsCsv(resultPath);
-               env.execute();
-               expectedResult = "1,1\n" +
-                                       "2,2\n" +
-                                       "3,3\n" +
-                                       "4,4\n" +
-                                       "5,5\n";
-       }
-
-       @Test
-       public void testCreateWithCustomVertexValue() throws Exception {
-               /*
-                * Test create() with edge dataset and a mapper that assigns a 
parametrized custom vertex value
-                */
-               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-               Graph<Long, DummyCustomParameterizedType<Double>, Long> graph = 
Graph.fromDataSet(
-                               TestGraphUtils.getLongLongEdgeData(env), new 
AssignCustomVertexValueMapper(), env);
-
-               graph.getVertices().writeAsCsv(resultPath);
-               env.execute();
-               expectedResult = "1,(2.0,0)\n" +
-                               "2,(4.0,1)\n" +
-                               "3,(6.0,2)\n" +
-                               "4,(8.0,3)\n" +
-                               "5,(10.0,4)\n";
-       }
-
-       @Test
-       public void testValidate() throws Exception {
-               /*
-                * Test validate():
-                */
-               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-               DataSet<Vertex<Long, Long>> vertices = 
TestGraphUtils.getLongLongVertexData(env);
-               DataSet<Edge<Long, Long>> edges = 
TestGraphUtils.getLongLongEdgeData(env);
-
-               Graph<Long, Long, Long> graph = Graph.fromDataSet(vertices, 
edges, env);
-               DataSet<Boolean> result = graph.validate(new 
InvalidVertexIdsValidator<Long, Long, Long>());
-
-               result.writeAsText(resultPath);
-               env.execute();
-
-               expectedResult = "true\n";
-       }
-
-       @Test
-       public void testValidateWithInvalidIds() throws Exception {
-               /*
-                * Test validate() - invalid vertex ids
-                */
-               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-               DataSet<Vertex<Long, Long>> vertices = 
TestGraphUtils.getLongLongInvalidVertexData(env);
-               DataSet<Edge<Long, Long>> edges = 
TestGraphUtils.getLongLongEdgeData(env);
-
-               Graph<Long, Long, Long> graph = Graph.fromDataSet(vertices, 
edges, env);
-               DataSet<Boolean> result = graph.validate(new 
InvalidVertexIdsValidator<Long, Long, Long>());
-               result.writeAsText(resultPath);
-               env.execute();
-
-               expectedResult = "false\n";
-       }
-
-       @SuppressWarnings("serial")
-       private static final class AssignIdAsValueMapper implements 
MapFunction<Long, Long> {
-               public Long map(Long vertexId) {
-                       return vertexId;
-               }
-       }
-
-       @SuppressWarnings("serial")
-       private static final class AssignCustomVertexValueMapper implements
-               MapFunction<Long, DummyCustomParameterizedType<Double>> {
-
-               DummyCustomParameterizedType<Double> dummyValue =
-                               new DummyCustomParameterizedType<Double>();
-
-               public DummyCustomParameterizedType<Double> map(Long vertexId) {
-                       dummyValue.setIntField(vertexId.intValue()-1);
-                       dummyValue.setTField(vertexId*2.0);
-                       return dummyValue;
-               }
-       }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/6296c394/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestGraphCreationWithMapper.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestGraphCreationWithMapper.java
 
b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestGraphCreationWithMapper.java
deleted file mode 100644
index 67ff5cc..0000000
--- 
a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestGraphCreationWithMapper.java
+++ /dev/null
@@ -1,158 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.test;
-
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.test.TestGraphUtils.DummyCustomType;
-import org.apache.flink.test.util.MultipleProgramsTestBase;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-@RunWith(Parameterized.class)
-public class TestGraphCreationWithMapper extends MultipleProgramsTestBase {
-
-       public 
TestGraphCreationWithMapper(MultipleProgramsTestBase.ExecutionMode mode){
-               super(mode);
-       }
-
-    private String resultPath;
-    private String expectedResult;
-
-    @Rule
-       public TemporaryFolder tempFolder = new TemporaryFolder();
-
-       @Before
-       public void before() throws Exception{
-               resultPath = tempFolder.newFile().toURI().toString();
-       }
-
-       @After
-       public void after() throws Exception{
-               compareResultsByLinesInMemory(expectedResult, resultPath);
-       }
-
-       @Test
-       public void testWithDoubleValueMapper() throws Exception {
-               /*
-                * Test create() with edge dataset and a mapper that assigns a 
double constant as value
-            */
-               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-               Graph<Long, Double, Long> graph = 
Graph.fromDataSet(TestGraphUtils.getLongLongEdgeData(env),
-                               new AssignDoubleValueMapper(), env);
-
-               graph.getVertices().writeAsCsv(resultPath);
-               env.execute();
-               expectedResult = "1,0.1\n" +
-                               "2,0.1\n" +
-                               "3,0.1\n" +
-                               "4,0.1\n" +
-                               "5,0.1\n";
-       }
-
-       @Test
-       public void testWithTuple2ValueMapper() throws Exception {
-               /*
-                * Test create() with edge dataset and a mapper that assigns a 
Tuple2 as value
-                */
-               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-               Graph<Long, Tuple2<Long, Long>, Long> graph = Graph.fromDataSet(
-                               TestGraphUtils.getLongLongEdgeData(env), new 
AssignTuple2ValueMapper(), env);
-
-               graph.getVertices().writeAsCsv(resultPath);
-               env.execute();
-               expectedResult = "1,(2,42)\n" +
-                               "2,(4,42)\n" +
-                               "3,(6,42)\n" +
-                               "4,(8,42)\n" +
-                               "5,(10,42)\n";
-       }
-
-       @Test
-       public void testWithConstantValueMapper() throws Exception {
-       /*
-        * Test create() with edge dataset with String key type
-        * and a mapper that assigns a double constant as value
-        */
-       final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-       Graph<String, Double, Long> graph = 
Graph.fromDataSet(TestGraphUtils.getStringLongEdgeData(env),
-                       new AssignDoubleConstantMapper(), env);
-
-       graph.getVertices().writeAsCsv(resultPath);
-       env.execute();
-       expectedResult = "1,0.1\n" +
-                       "2,0.1\n" +
-                       "3,0.1\n" +
-                       "4,0.1\n" +
-                       "5,0.1\n";
-       }
-
-       @Test
-       public void testWithDCustomValueMapper() throws Exception {
-               /*
-                * Test create() with edge dataset and a mapper that assigns a 
custom vertex value
-                */
-               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-               Graph<Long, DummyCustomType, Long> graph = Graph.fromDataSet(
-                               TestGraphUtils.getLongLongEdgeData(env), new 
AssignCustomValueMapper(), env);
-
-               graph.getVertices().writeAsCsv(resultPath);
-               env.execute();
-               expectedResult = "1,(F,0)\n" +
-                               "2,(F,1)\n" +
-                               "3,(F,2)\n" +
-                               "4,(F,3)\n" +
-                               "5,(F,4)\n";
-       }
-
-       @SuppressWarnings("serial")
-       private static final class AssignDoubleValueMapper implements 
MapFunction<Long, Double> {
-               public Double map(Long value) {
-                       return 0.1d;
-               }
-       }
-
-       @SuppressWarnings("serial")
-       private static final class AssignTuple2ValueMapper implements 
MapFunction<Long, Tuple2<Long, Long>> {
-               public Tuple2<Long, Long> map(Long vertexId) {
-                       return new Tuple2<Long, Long>(vertexId*2, 42l);
-               }
-       }
-
-       @SuppressWarnings("serial")
-       private static final class AssignDoubleConstantMapper implements 
MapFunction<String, Double> {
-               public Double map(String value) {
-                       return 0.1d;
-               }
-       }
-
-       @SuppressWarnings("serial")
-       private static final class AssignCustomValueMapper implements 
MapFunction<Long, DummyCustomType> {
-               public DummyCustomType map(Long vertexId) {
-                       return new DummyCustomType(vertexId.intValue()-1, 
false);
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/6296c394/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestGraphMutations.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestGraphMutations.java
 
b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestGraphMutations.java
deleted file mode 100644
index f53f51e..0000000
--- 
a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestGraphMutations.java
+++ /dev/null
@@ -1,273 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.test;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.graph.Edge;
-import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.Vertex;
-import org.apache.flink.test.util.MultipleProgramsTestBase;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-@RunWith(Parameterized.class)
-public class TestGraphMutations extends MultipleProgramsTestBase {
-
-       public TestGraphMutations(MultipleProgramsTestBase.ExecutionMode mode){
-               super(mode);
-       }
-
-    private String resultPath;
-    private String expectedResult;
-
-    @Rule
-       public TemporaryFolder tempFolder = new TemporaryFolder();
-
-       @Before
-       public void before() throws Exception{
-               resultPath = tempFolder.newFile().toURI().toString();
-       }
-
-       @After
-       public void after() throws Exception{
-               compareResultsByLinesInMemory(expectedResult, resultPath);
-       }
-
-       @Test
-       public void testAddVertex() throws Exception {
-               /*
-                * Test addVertex() -- simple case
-                */     
-
-               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-
-               Graph<Long, Long, Long> graph = 
Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
-                               TestGraphUtils.getLongLongEdgeData(env), env);
-
-               List<Edge<Long, Long>> edges = new ArrayList<Edge<Long, 
Long>>();
-               edges.add(new Edge<Long, Long>(6L, 1L, 61L));
-               graph = graph.addVertex(new Vertex<Long, Long>(6L, 6L), edges);
-               graph.getEdges().writeAsCsv(resultPath);
-               env.execute();
-
-               expectedResult = "1,2,12\n" +
-                               "1,3,13\n" +
-                               "2,3,23\n" +
-                               "3,4,34\n" +
-                               "3,5,35\n" +
-                               "4,5,45\n" +
-                               "5,1,51\n" +
-                               "6,1,61\n";     
-       }
-
-       @Test
-       public void testAddVertexExisting() throws Exception {
-               /*
-                * Test addVertex() -- add an existing vertex
-                */
-               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-               
-               Graph<Long, Long, Long> graph = 
Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
-                               TestGraphUtils.getLongLongEdgeData(env), env);
-               
-               List<Edge<Long, Long>> edges = new ArrayList<Edge<Long, 
Long>>();
-               edges.add(new Edge<Long, Long>(1L, 5L, 15L));
-               graph = graph.addVertex(new Vertex<Long, Long>(1L, 1L), edges);
-               graph.getEdges().writeAsCsv(resultPath);
-               env.execute();
-
-               expectedResult = "1,2,12\n" +
-                                       "1,3,13\n" +
-                                       "1,5,15\n" +
-                                       "2,3,23\n" +
-                                       "3,4,34\n" +
-                                       "3,5,35\n" +
-                                       "4,5,45\n" +
-                                       "5,1,51\n";
-       }
-
-       @Test
-       public void testAddVertexNoEdges() throws Exception {
-               /*
-                * Test addVertex() -- add vertex with empty edge set
-                */     
-               
-               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-               
-               Graph<Long, Long, Long> graph = 
Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
-                               TestGraphUtils.getLongLongEdgeData(env), env);
-               List<Edge<Long, Long>> edges = new ArrayList<Edge<Long, 
Long>>();
-               graph = graph.addVertex(new Vertex<Long, Long>(6L, 6L), edges);
-               graph.getVertices().writeAsCsv(resultPath);
-               env.execute();
-       
-               expectedResult = "1,1\n" +
-                       "2,2\n" +
-                       "3,3\n" +
-                       "4,4\n" +
-                       "5,5\n" +
-                       "6,6\n";
-       }
-
-       @Test
-       public void testRemoveVertex() throws Exception {
-               /*
-                * Test removeVertex() -- simple case
-                */
-
-               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-
-               Graph<Long, Long, Long> graph = 
Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
-                               TestGraphUtils.getLongLongEdgeData(env), env);
-               graph = graph.removeVertex(new Vertex<Long, Long>(5L, 5L));
-               graph.getEdges().writeAsCsv(resultPath);
-               env.execute();
-
-               expectedResult = "1,2,12\n" +
-                               "1,3,13\n" +
-                               "2,3,23\n" +
-                               "3,4,34\n";
-       }
-
-       @Test
-       public void testRemoveInvalidVertex() throws Exception {
-               /*
-                * Test removeVertex() -- remove an invalid vertex
-                */     
-               
-               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-               
-               Graph<Long, Long, Long> graph = 
Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
-                               TestGraphUtils.getLongLongEdgeData(env), env);
-               graph = graph.removeVertex(new Vertex<Long, Long>(6L, 6L));
-               graph.getEdges().writeAsCsv(resultPath);
-               env.execute();
-
-               expectedResult = "1,2,12\n" +
-                               "1,3,13\n" +
-                               "2,3,23\n" +
-                               "3,4,34\n" +
-                               "3,5,35\n" +
-                               "4,5,45\n" +
-                               "5,1,51\n";
-       }
-       
-       @Test
-       public void testAddEdge() throws Exception {
-               /*
-                * Test addEdge() -- simple case
-                */
-               
-               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-               
-               Graph<Long, Long, Long> graph = 
Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
-                               TestGraphUtils.getLongLongEdgeData(env), env);
-               graph = graph.addEdge(new Vertex<Long, Long>(6L, 6L), new 
Vertex<Long, Long>(1L, 1L),
-                               61L);
-               graph.getEdges().writeAsCsv(resultPath);
-               env.execute();
-
-               expectedResult = "1,2,12\n" +
-                               "1,3,13\n" +
-                               "2,3,23\n" +
-                               "3,4,34\n" +
-                               "3,5,35\n" +
-                               "4,5,45\n" +
-                               "5,1,51\n" +
-                               "6,1,61\n";     
-       }
-       
-       @Test
-       public void testAddExistingEdge() throws Exception {
-               /*
-                * Test addEdge() -- add already existing edge
-                */
-
-               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-
-               Graph<Long, Long, Long> graph = 
Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
-                               TestGraphUtils.getLongLongEdgeData(env), env);
-               graph = graph.addEdge(new Vertex<Long, Long>(1L, 1L), new 
Vertex<Long, Long>(2L, 2L),
-                               12L);
-               graph.getEdges().writeAsCsv(resultPath);
-               env.execute();
-
-               expectedResult = "1,2,12\n" +
-                               "1,2,12\n" +
-                               "1,3,13\n" +
-                               "2,3,23\n" +
-                               "3,4,34\n" +
-                               "3,5,35\n" +
-                               "4,5,45\n" +
-                               "5,1,51\n";     
-       }
-       
-       @Test
-       public void testRemoveVEdge() throws Exception {
-               /*
-                * Test removeEdge() -- simple case
-                */
-
-               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-
-               Graph<Long, Long, Long> graph = 
Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
-                               TestGraphUtils.getLongLongEdgeData(env), env);
-               graph = graph.removeEdge(new Edge<Long, Long>(5L, 1L, 51L));
-               graph.getEdges().writeAsCsv(resultPath);
-               env.execute();
-
-               expectedResult = "1,2,12\n" +
-                               "1,3,13\n" +
-                               "2,3,23\n" +
-                               "3,4,34\n" +
-                               "3,5,35\n" +
-                               "4,5,45\n";
-       }
-       
-       @Test
-       public void testRemoveInvalidEdge() throws Exception {
-               /*
-                * Test removeEdge() -- invalid edge
-                */
-
-               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-               
-               Graph<Long, Long, Long> graph = 
Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
-                               TestGraphUtils.getLongLongEdgeData(env), env);
-               graph = graph.removeEdge(new Edge<Long, Long>(6L, 1L, 61L));
-               graph.getEdges().writeAsCsv(resultPath);
-               env.execute();
-
-               expectedResult = "1,2,12\n" +
-                               "1,3,13\n" +
-                               "2,3,23\n" +
-                               "3,4,34\n" +
-                               "3,5,35\n" +
-                               "4,5,45\n" +
-                               "5,1,51\n";
-       }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/6296c394/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestGraphOperations.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestGraphOperations.java
 
b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestGraphOperations.java
deleted file mode 100644
index 6ab6928..0000000
--- 
a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestGraphOperations.java
+++ /dev/null
@@ -1,267 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.test;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.flink.api.common.functions.FilterFunction;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.graph.Edge;
-import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.Vertex;
-import org.apache.flink.test.util.MultipleProgramsTestBase;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-@RunWith(Parameterized.class)
-public class TestGraphOperations extends MultipleProgramsTestBase {
-
-       public TestGraphOperations(MultipleProgramsTestBase.ExecutionMode mode){
-               super(mode);
-       }
-
-    private String resultPath;
-    private String expectedResult;
-
-    @Rule
-       public TemporaryFolder tempFolder = new TemporaryFolder();
-
-       @Before
-       public void before() throws Exception{
-               resultPath = tempFolder.newFile().toURI().toString();
-       }
-
-       @After
-       public void after() throws Exception{
-               compareResultsByLinesInMemory(expectedResult, resultPath);
-       }
-
-       @Test
-       public void testUndirected() throws Exception {
-               /*
-                * Test getUndirected()
-                */
-               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-
-               Graph<Long, Long, Long> graph = 
Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
-                               TestGraphUtils.getLongLongEdgeData(env), env);
-
-               graph.getUndirected().getEdges().writeAsCsv(resultPath);
-               env.execute();
-               expectedResult = "1,2,12\n" + "2,1,12\n" +
-                                       "1,3,13\n" + "3,1,13\n" +
-                                       "2,3,23\n" + "3,2,23\n" +
-                                       "3,4,34\n" + "4,3,34\n" +
-                                       "3,5,35\n" + "5,3,35\n" +
-                                       "4,5,45\n" + "5,4,45\n" +
-                                       "5,1,51\n" + "1,5,51\n";
-       }
-
-       @Test
-       public void testReverse() throws Exception {
-               /*
-                * Test reverse()
-                */
-               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-
-               Graph<Long, Long, Long> graph = 
Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
-                               TestGraphUtils.getLongLongEdgeData(env), env);
-
-               graph.reverse().getEdges().writeAsCsv(resultPath);
-               env.execute();
-               expectedResult = "2,1,12\n" +
-                                       "3,1,13\n" +
-                                       "3,2,23\n" +
-                                       "4,3,34\n" +
-                                       "5,3,35\n" +
-                                       "5,4,45\n" +
-                                       "1,5,51\n";
-       }
-
-       @SuppressWarnings("serial")
-       @Test
-       public void testSubGraph() throws Exception {
-               /*
-                * Test subgraph:
-                */
-               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-
-               Graph<Long, Long, Long> graph = 
Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
-                               TestGraphUtils.getLongLongEdgeData(env), env);
-               graph.subgraph(new FilterFunction<Vertex<Long, Long>>() {
-                                                  public boolean 
filter(Vertex<Long, Long> vertex) throws Exception {
-                                                          return 
(vertex.getValue() > 2);
-                                                  }
-                                          },
-                               new FilterFunction<Edge<Long, Long>>() {
-                                       public boolean filter(Edge<Long, Long> 
edge) throws Exception {
-                                               return (edge.getValue() > 34);
-                                       }
-                               }).getEdges().writeAsCsv(resultPath);
-
-               env.execute();
-               expectedResult = "3,5,35\n" +
-                                       "4,5,45\n";
-       }
-
-       @SuppressWarnings("serial")
-       @Test
-       public void testFilterVertices() throws Exception {
-               /*
-                * Test filterOnVertices:
-                */
-               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-
-               Graph<Long, Long, Long> graph = 
Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
-                               TestGraphUtils.getLongLongEdgeData(env), env);
-               graph.filterOnVertices(new FilterFunction<Vertex<Long, Long>>() 
{
-                       public boolean filter(Vertex<Long, Long> vertex) throws 
Exception {
-                               return (vertex.getValue() > 2);
-                       }
-               }).getEdges().writeAsCsv(resultPath);
-
-               env.execute();
-               expectedResult =  "3,4,34\n" +
-                               "3,5,35\n" +
-                               "4,5,45\n";
-       }
-
-       @SuppressWarnings("serial")
-       @Test
-       public void testFilterEdges() throws Exception {
-               /*
-                * Test filterOnEdges:
-                */
-               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-
-               Graph<Long, Long, Long> graph = 
Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
-                               TestGraphUtils.getLongLongEdgeData(env), env);
-               graph.filterOnEdges(new FilterFunction<Edge<Long, Long>>() {
-                       public boolean filter(Edge<Long, Long> edge) throws 
Exception {
-                               return (edge.getValue() > 34);
-                       }
-               }).getEdges().writeAsCsv(resultPath);
-
-               env.execute();
-               expectedResult = "3,5,35\n" +
-                                       "4,5,45\n" +
-                                       "5,1,51\n";
-       }
-
-       @Test
-       public void testNumberOfVertices() throws Exception {
-               /*
-                * Test numberOfVertices()
-                */
-               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-
-               Graph<Long, Long, Long> graph = 
Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
-                               TestGraphUtils.getLongLongEdgeData(env), env);
-               graph.numberOfVertices().writeAsText(resultPath);
-
-               env.execute();
-               expectedResult = "5";
-       }
-
-       @Test
-       public void testNumberOfEdges() throws Exception {
-               /*
-                * Test numberOfEdges()
-                */
-               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-
-               Graph<Long, Long, Long> graph = 
Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
-                               TestGraphUtils.getLongLongEdgeData(env), env);
-               graph.numberOfEdges().writeAsText(resultPath);
-
-               env.execute();
-               expectedResult = "7";
-       }
-
-       @Test
-       public void testVertexIds() throws Exception {
-               /*
-                * Test getVertexIds()
-                */
-               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-
-               Graph<Long, Long, Long> graph = 
Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
-                               TestGraphUtils.getLongLongEdgeData(env), env);
-               graph.getVertexIds().writeAsText(resultPath);
-
-               env.execute();
-               expectedResult = "1\n2\n3\n4\n5\n";
-       }
-
-       @Test
-       public void testEdgesIds() throws Exception {
-               /*
-                * Test getEdgeIds()
-                */
-               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-
-               Graph<Long, Long, Long> graph = 
Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
-                               TestGraphUtils.getLongLongEdgeData(env), env);
-               graph.getEdgeIds().writeAsCsv(resultPath);
-
-               env.execute();
-               expectedResult = "1,2\n" + "1,3\n" +
-                               "2,3\n" + "3,4\n" +
-                               "3,5\n" + "4,5\n" +
-                               "5,1\n";
-       }
-
-       @Test
-       public void testUnion() throws Exception {
-               /*
-                * Test union()
-                */
-               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-
-               Graph<Long, Long, Long> graph = 
Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
-                               TestGraphUtils.getLongLongEdgeData(env), env);
-
-               List<Vertex<Long, Long>> vertices = new ArrayList<Vertex<Long, 
Long>>();
-               List<Edge<Long, Long>> edges = new ArrayList<Edge<Long, 
Long>>();
-
-               vertices.add(new Vertex<Long, Long>(6L, 6L));
-               edges.add(new Edge<Long, Long>(6L, 1L, 61L));
-
-               graph = graph.union(Graph.fromCollection(vertices, edges, env));
-
-               graph.getEdges().writeAsCsv(resultPath);
-
-               env.execute();
-
-               expectedResult = "1,2,12\n" +
-                                       "1,3,13\n" +
-                                       "2,3,23\n" +
-                                       "3,4,34\n" +
-                                       "3,5,35\n" +
-                                       "4,5,45\n" +
-                                       "5,1,51\n" +
-                                       "6,1,61\n";
-       }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/6296c394/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestJoinWithEdges.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestJoinWithEdges.java
 
b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestJoinWithEdges.java
deleted file mode 100644
index 0be97be..0000000
--- 
a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestJoinWithEdges.java
+++ /dev/null
@@ -1,519 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.test;
-
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.graph.Edge;
-import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.test.TestGraphUtils.DummyCustomParameterizedType;
-import org.apache.flink.graph.utils.EdgeToTuple3Map;
-import org.apache.flink.test.util.MultipleProgramsTestBase;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-@RunWith(Parameterized.class)
-public class TestJoinWithEdges extends MultipleProgramsTestBase {
-
-       public TestJoinWithEdges(MultipleProgramsTestBase.ExecutionMode mode){
-               super(mode);
-       }
-
-    private String resultPath;
-    private String expectedResult;
-
-    @Rule
-       public TemporaryFolder tempFolder = new TemporaryFolder();
-
-       @Before
-       public void before() throws Exception{
-               resultPath = tempFolder.newFile().toURI().toString();
-       }
-
-       @After
-       public void after() throws Exception{
-               compareResultsByLinesInMemory(expectedResult, resultPath);
-       }
-
-       @Test
-       public void testWithEdgesInputDataset() throws Exception {
-               /*
-                * Test joinWithEdges with the input DataSet parameter identical
-                * to the edge DataSet
-                */
-        final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-
-        Graph<Long, Long, Long> graph = 
Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
-                TestGraphUtils.getLongLongEdgeData(env), env);
-
-        Graph<Long, Long, Long> result = graph.joinWithEdges(graph.getEdges()
-                        .map(new EdgeToTuple3Map<Long, Long>()), new 
AddValuesMapper());
-
-        result.getEdges().writeAsCsv(resultPath);
-        env.execute();
-
-        expectedResult = "1,2,24\n" +
-                       "1,3,26\n" +
-                       "2,3,46\n" +
-                       "3,4,68\n" +
-                       "3,5,70\n" +
-                       "4,5,90\n" +
-                       "5,1,102\n";
-    }
-
-       @Test
-       public void testWithLessElements() throws Exception {
-           /*
-                * Test joinWithEdges with the input DataSet passed as a 
parameter containing
-                * less elements than the edge DataSet, but of the same type
-                */
-        final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-
-        Graph<Long, Long, Long> graph = 
Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
-                TestGraphUtils.getLongLongEdgeData(env), env);
-
-        Graph<Long, Long, Long> result = 
graph.joinWithEdges(graph.getEdges().first(3)
-                        .map(new EdgeToTuple3Map<Long, Long>()), new 
AddValuesMapper());
-
-        result.getEdges().writeAsCsv(resultPath);
-        env.execute();
-
-        expectedResult = "1,2,24\n" +
-                       "1,3,26\n" +
-                       "2,3,46\n" +
-                       "3,4,34\n" +
-                       "3,5,35\n" +
-                       "4,5,45\n" +
-                       "5,1,51\n";
-    }
-
-       @Test
-       public void testWithLessElementsDifferentType() throws Exception {
-           /*
-                * Test joinWithEdges with the input DataSet passed as a 
parameter containing
-                * less elements than the edge DataSet and of a different 
type(Boolean)
-                */
-        final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-
-        Graph<Long, Long, Long> graph = 
Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
-                TestGraphUtils.getLongLongEdgeData(env), env);
-
-        Graph<Long, Long, Long> result = 
graph.joinWithEdges(graph.getEdges().first(3)
-                        .map(new BooleanEdgeValueMapper()), new 
DoubleIfTrueMapper());
-
-        result.getEdges().writeAsCsv(resultPath);
-        env.execute();
-
-        expectedResult = "1,2,24\n" +
-                       "1,3,26\n" +
-                       "2,3,46\n" +
-                       "3,4,34\n" +
-                       "3,5,35\n" +
-                       "4,5,45\n" +
-                       "5,1,51\n";
-    }
-
-       @Test
-       public void testWithNoCommonKeys() throws Exception {
-           /*
-                * Test joinWithEdges with the input DataSet containing 
different keys than the edge DataSet
-                * - the iterator becomes empty.
-                */
-        final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-
-        Graph<Long, Long, Long> graph = 
Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
-                TestGraphUtils.getLongLongEdgeData(env), env);
-
-        Graph<Long, Long, Long> result = 
graph.joinWithEdges(TestGraphUtils.getLongLongLongTuple3Data(env),
-                new DoubleValueMapper());
-
-        result.getEdges().writeAsCsv(resultPath);
-        env.execute();
-
-        expectedResult = "1,2,24\n" +
-                       "1,3,26\n" +
-                       "2,3,46\n" +
-                       "3,4,68\n" +
-                       "3,5,35\n" +
-                       "4,5,45\n" +
-                       "5,1,51\n";
-    }
-
-       @Test
-       public void testWithCustomType() throws Exception {
-           /*
-            * Test joinWithEdges with a DataSet containing custom parametrised 
type input values
-                */
-        final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-
-        Graph<Long, Long, Long> graph = 
Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
-                TestGraphUtils.getLongLongEdgeData(env), env);
-
-        Graph<Long, Long, Long> result = 
graph.joinWithEdges(TestGraphUtils.getLongLongCustomTuple3Data(env),
-                new CustomValueMapper());
-
-        result.getEdges().writeAsCsv(resultPath);
-        env.execute();
-
-        expectedResult = "1,2,10\n" +
-                       "1,3,20\n" +
-                       "2,3,30\n" +
-                       "3,4,40\n" +
-                       "3,5,35\n" +
-                       "4,5,45\n" +
-                       "5,1,51\n";
-    }
-
-       @Test
-       public void testWithEdgesOnSource() throws Exception {
-           /*
-                * Test joinWithEdgesOnSource with the input DataSet parameter 
identical
-                * to the edge DataSet
-                */
-        final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-
-        Graph<Long, Long, Long> graph = 
Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
-                TestGraphUtils.getLongLongEdgeData(env), env);
-
-        Graph<Long, Long, Long> result = 
graph.joinWithEdgesOnSource(graph.getEdges()
-                        .map(new ProjectSourceAndValueMapper()), new 
AddValuesMapper());
-
-        result.getEdges().writeAsCsv(resultPath);
-        env.execute();
-
-        expectedResult = "1,2,24\n" +
-                       "1,3,25\n" +
-                       "2,3,46\n" +
-                       "3,4,68\n" +
-                       "3,5,69\n" +
-                       "4,5,90\n" +
-                       "5,1,102\n";
-    }
-
-       @Test
-       public void testOnSourceWithLessElements() throws Exception {
-           /*
-                * Test joinWithEdgesOnSource with the input DataSet passed as 
a parameter containing
-                * less elements than the edge DataSet, but of the same type
-                */
-        final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-
-        Graph<Long, Long, Long> graph = 
Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
-                TestGraphUtils.getLongLongEdgeData(env), env);
-
-        Graph<Long, Long, Long> result = 
graph.joinWithEdgesOnSource(graph.getEdges().first(3)
-                        .map(new ProjectSourceAndValueMapper()), new 
AddValuesMapper());
-
-        result.getEdges().writeAsCsv(resultPath);
-        env.execute();
-
-        expectedResult = "1,2,24\n" +
-                       "1,3,25\n" +
-                       "2,3,46\n" +
-                       "3,4,34\n" +
-                       "3,5,35\n" +
-                       "4,5,45\n" +
-                       "5,1,51\n";
-    }
-
-       @Test
-       public void testOnSourceWithDifferentType() throws Exception {
-           /*
-                * Test joinWithEdgesOnSource with the input DataSet passed as 
a parameter containing
-                * less elements than the edge DataSet and of a different 
type(Boolean)
-                */
-        final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-
-        Graph<Long, Long, Long> graph = 
Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
-                TestGraphUtils.getLongLongEdgeData(env), env);
-
-        Graph<Long, Long, Long> result = 
graph.joinWithEdgesOnSource(graph.getEdges().first(3)
-                        .map(new ProjectSourceWithTrueMapper()), new 
DoubleIfTrueMapper());
-
-        result.getEdges().writeAsCsv(resultPath);
-        env.execute();
-
-        expectedResult = "1,2,24\n" +
-                       "1,3,26\n" +
-                       "2,3,46\n" +
-                       "3,4,34\n" +
-                       "3,5,35\n" +
-                       "4,5,45\n" +
-                       "5,1,51\n";
-    }
-
-       @Test
-       public void testOnSourceWithNoCommonKeys() throws Exception {
-           /*
-                * Test joinWithEdgesOnSource with the input DataSet containing 
different keys than the edge DataSet
-                * - the iterator becomes empty.
-                */
-        final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-
-        Graph<Long, Long, Long> graph = 
Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
-                TestGraphUtils.getLongLongEdgeData(env), env);
-
-        Graph<Long, Long, Long> result = 
graph.joinWithEdgesOnSource(TestGraphUtils.getLongLongTuple2SourceData(env),
-                new DoubleValueMapper());
-
-        result.getEdges().writeAsCsv(resultPath);
-        env.execute();
-
-        expectedResult = "1,2,20\n" +
-                       "1,3,20\n" +
-                       "2,3,60\n" +
-                       "3,4,80\n" +
-                       "3,5,80\n" +
-                       "4,5,120\n" +
-                       "5,1,51\n";
-    }
-
-       @Test
-       public void testOnSourceWithCustom() throws Exception {
-           /*
-            * Test joinWithEdgesOnSource with a DataSet containing custom 
parametrised type input values
-                */
-        final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-
-        Graph<Long, Long, Long> graph = 
Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
-                TestGraphUtils.getLongLongEdgeData(env), env);
-
-        Graph<Long, Long, Long> result = 
graph.joinWithEdgesOnSource(TestGraphUtils.getLongCustomTuple2SourceData(env),
-                new CustomValueMapper());
-
-        result.getEdges().writeAsCsv(resultPath);
-        env.execute();
-
-        expectedResult = "1,2,10\n" +
-                       "1,3,10\n" +
-                       "2,3,30\n" +
-                       "3,4,40\n" +
-                       "3,5,40\n" +
-                       "4,5,45\n" +
-                       "5,1,51\n";
-    }
-
-       @Test
-       public void testWithEdgesOnTarget() throws Exception {
-    /*
-        * Test joinWithEdgesOnTarget with the input DataSet parameter identical
-        * to the edge DataSet
-        */
-        final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-
-        Graph<Long, Long, Long> graph = 
Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
-                TestGraphUtils.getLongLongEdgeData(env), env);
-
-        Graph<Long, Long, Long> result = 
graph.joinWithEdgesOnTarget(graph.getEdges()
-                        .map(new ProjectTargetAndValueMapper()), new 
AddValuesMapper());
-
-        result.getEdges().writeAsCsv(resultPath);
-        env.execute();
-
-        expectedResult = "1,2,24\n" +
-                       "1,3,26\n" +
-                       "2,3,36\n" +
-                       "3,4,68\n" +
-                       "3,5,70\n" +
-                       "4,5,80\n" +
-                       "5,1,102\n";
-    }
-
-       @Test
-       public void testWithOnTargetWithLessElements() throws Exception {
-           /*
-                * Test joinWithEdgesOnTarget with the input DataSet passed as 
a parameter containing
-                * less elements than the edge DataSet, but of the same type
-                */
-        final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-
-        Graph<Long, Long, Long> graph = 
Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
-                TestGraphUtils.getLongLongEdgeData(env), env);
-
-        Graph<Long, Long, Long> result = 
graph.joinWithEdgesOnTarget(graph.getEdges().first(3)
-                        .map(new ProjectTargetAndValueMapper()), new 
AddValuesMapper());
-
-        result.getEdges().writeAsCsv(resultPath);
-        env.execute();
-
-        result.getEdges().writeAsCsv(resultPath);
-        env.execute();
-
-        expectedResult = "1,2,24\n" +
-                       "1,3,26\n" +
-                       "2,3,36\n" +
-                       "3,4,34\n" +
-                       "3,5,35\n" +
-                       "4,5,45\n" +
-                       "5,1,51\n";
-    }
-
-       @Test
-       public void testOnTargetWithDifferentType() throws Exception {
-           /*
-                * Test joinWithEdgesOnTarget with the input DataSet passed as 
a parameter containing
-                * less elements than the edge DataSet and of a different 
type(Boolean)
-                */
-        final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-
-        Graph<Long, Long, Long> graph = 
Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
-                TestGraphUtils.getLongLongEdgeData(env), env);
-
-        Graph<Long, Long, Long> result = 
graph.joinWithEdgesOnTarget(graph.getEdges().first(3)
-                        .map(new ProjectTargetWithTrueMapper()), new 
DoubleIfTrueMapper());
-
-        result.getEdges().writeAsCsv(resultPath);
-        env.execute();
-
-        expectedResult = "1,2,24\n" +
-                       "1,3,26\n" +
-                       "2,3,46\n" +
-                       "3,4,34\n" +
-                       "3,5,35\n" +
-                       "4,5,45\n" +
-                       "5,1,51\n";
-    }
-
-       @Test
-       public void testOnTargetWithNoCommonKeys() throws Exception {
-           /*
-                * Test joinWithEdgesOnTarget with the input DataSet containing 
different keys than the edge DataSet
-                * - the iterator becomes empty.
-                */
-        final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-
-        Graph<Long, Long, Long> graph = 
Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
-                TestGraphUtils.getLongLongEdgeData(env), env);
-
-        Graph<Long, Long, Long> result = 
graph.joinWithEdgesOnTarget(TestGraphUtils.getLongLongTuple2TargetData(env),
-                new DoubleValueMapper());
-
-        result.getEdges().writeAsCsv(resultPath);
-        env.execute();
-
-        expectedResult = "1,2,20\n" +
-                       "1,3,40\n" +
-                       "2,3,40\n" +
-                       "3,4,80\n" +
-                       "3,5,35\n" +
-                       "4,5,45\n" +
-                       "5,1,140\n";
-    }
-
-       @Test
-       public void testOnTargetWithCustom() throws Exception {
-           /*
-            * Test joinWithEdgesOnTarget with a DataSet containing custom 
parametrised type input values
-                */
-        final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-
-        Graph<Long, Long, Long> graph = 
Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
-                TestGraphUtils.getLongLongEdgeData(env), env);
-
-        Graph<Long, Long, Long> result = 
graph.joinWithEdgesOnTarget(TestGraphUtils.getLongCustomTuple2TargetData(env),
-                new CustomValueMapper());
-
-        result.getEdges().writeAsCsv(resultPath);
-        env.execute();
-
-        expectedResult = "1,2,10\n" +
-                       "1,3,20\n" +
-                       "2,3,20\n" +
-                       "3,4,40\n" +
-                       "3,5,35\n" +
-                       "4,5,45\n" +
-                       "5,1,51\n";
-    }
-
-       @SuppressWarnings("serial")
-       private static final class AddValuesMapper implements 
MapFunction<Tuple2<Long, Long>, Long> {
-               public Long map(Tuple2<Long, Long> tuple) throws Exception {
-                       return tuple.f0 + tuple.f1;
-               }
-       }
-
-       @SuppressWarnings("serial")
-       private static final class BooleanEdgeValueMapper implements 
MapFunction<Edge<Long, Long>, Tuple3<Long, Long, Boolean>> {
-        public Tuple3<Long, Long, Boolean> map(Edge<Long, Long> edge) throws 
Exception {
-            return new Tuple3<Long, Long, Boolean>(edge.getSource(),
-                    edge.getTarget(), true);
-        }
-    }
-
-       @SuppressWarnings("serial")
-       private static final class DoubleIfTrueMapper implements 
MapFunction<Tuple2<Long, Boolean>, Long> {
-        public Long map(Tuple2<Long, Boolean> tuple) throws Exception {
-            if(tuple.f1) {
-                return tuple.f0 * 2;
-            }
-            else {
-                return tuple.f0;
-            }
-        }
-    }
-
-       @SuppressWarnings("serial")
-       private static final class DoubleValueMapper implements 
MapFunction<Tuple2<Long, Long>, Long> {
-        public Long map(Tuple2<Long, Long> tuple) throws Exception {
-            return tuple.f1 * 2;
-        }
-    }
-
-       @SuppressWarnings("serial")
-       private static final class CustomValueMapper implements 
MapFunction<Tuple2<Long, DummyCustomParameterizedType<Float>>, Long> {
-        public Long map(Tuple2<Long, DummyCustomParameterizedType<Float>> 
tuple) throws Exception {
-            return (long) tuple.f1.getIntField();
-        }
-    }
-
-       @SuppressWarnings("serial")
-       private static final class ProjectSourceAndValueMapper implements 
MapFunction<Edge<Long, Long>, Tuple2<Long, Long>> {
-        public Tuple2<Long, Long> map(Edge<Long, Long> edge) throws Exception {
-            return new Tuple2<Long, Long>(edge.getSource(), edge.getValue());
-        }
-    }
-
-       @SuppressWarnings("serial")
-       private static final class ProjectSourceWithTrueMapper implements 
MapFunction<Edge<Long, Long>, Tuple2<Long, Boolean>> {
-        public Tuple2<Long, Boolean> map(Edge<Long, Long> edge) throws 
Exception {
-            return new Tuple2<Long, Boolean>(edge.getSource(), true);
-        }
-    }
-
-       @SuppressWarnings("serial")
-       private static final class ProjectTargetAndValueMapper implements 
MapFunction<Edge<Long, Long>, Tuple2<Long, Long>> {
-        public Tuple2<Long, Long> map(Edge<Long, Long> edge) throws Exception {
-            return new Tuple2<Long, Long>(edge.getTarget(), edge.getValue());
-        }
-    }
-
-       @SuppressWarnings("serial")
-       private static final class ProjectTargetWithTrueMapper implements 
MapFunction<Edge<Long, Long>, Tuple2<Long, Boolean>> {
-        public Tuple2<Long, Boolean> map(Edge<Long, Long> edge) throws 
Exception {
-            return new Tuple2<Long, Boolean>(edge.getTarget(), true);
-        }
-    }
-}
\ No newline at end of file

Reply via email to