http://git-wip-us.apache.org/repos/asf/flink/blob/6296c394/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestJoinWithVertices.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestJoinWithVertices.java
 
b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestJoinWithVertices.java
deleted file mode 100644
index 8b0db35..0000000
--- 
a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestJoinWithVertices.java
+++ /dev/null
@@ -1,218 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.test;
-
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.Vertex;
-import org.apache.flink.graph.test.TestGraphUtils.DummyCustomParameterizedType;
-import org.apache.flink.graph.utils.VertexToTuple2Map;
-import org.apache.flink.test.util.MultipleProgramsTestBase;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-@RunWith(Parameterized.class)
-public class TestJoinWithVertices extends MultipleProgramsTestBase {
-
-       public TestJoinWithVertices(MultipleProgramsTestBase.ExecutionMode 
mode){
-               super(mode);
-       }
-
-    private String resultPath;
-    private String expectedResult;
-
-    @Rule
-       public TemporaryFolder tempFolder = new TemporaryFolder();
-
-       @Before
-       public void before() throws Exception{
-               resultPath = tempFolder.newFile().toURI().toString();
-       }
-
-       @After
-       public void after() throws Exception{
-               compareResultsByLinesInMemory(expectedResult, resultPath);
-       }
-
-       @Test
-       public void testJoinWithVertexSet() throws Exception {
-               /*
-                * Test joinWithVertices with the input DataSet parameter 
identical
-                * to the vertex DataSet
-                */
-        final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-
-        Graph<Long, Long, Long> graph = 
Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
-                TestGraphUtils.getLongLongEdgeData(env), env);
-
-        Graph<Long, Long, Long> result = 
graph.joinWithVertices(graph.getVertices()
-                        .map(new VertexToTuple2Map<Long, Long>()), new 
AddValuesMapper());
-
-        result.getVertices().writeAsCsv(resultPath);
-        env.execute();
-
-       expectedResult = "1,2\n" +
-                       "2,4\n" +
-                       "3,6\n" +
-                       "4,8\n" +
-                       "5,10\n";
-    }
-
-       @Test
-       public void testWithLessElements() throws Exception {
-       /*
-        * Test joinWithVertices with the input DataSet passed as a parameter 
containing
-        * less elements than the vertex DataSet, but of the same type
-        */
-        final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-
-        Graph<Long, Long, Long> graph = 
Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
-                TestGraphUtils.getLongLongEdgeData(env), env);
-
-        Graph<Long, Long, Long> result = 
graph.joinWithVertices(graph.getVertices().first(3)
-                        .map(new VertexToTuple2Map<Long, Long>()), new 
AddValuesMapper());
-
-        result.getVertices().writeAsCsv(resultPath);
-        env.execute();
-
-        expectedResult = "1,2\n" +
-                       "2,4\n" +
-                       "3,6\n" +
-                       "4,4\n" +
-                       "5,5\n";
-    }
-
-       @Test
-       public void testWithDifferentType() throws Exception {
-       /*
-        * Test joinWithVertices with the input DataSet passed as a parameter 
containing
-        * less elements than the vertex DataSet and of a different 
type(Boolean)
-        */
-        final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-
-        Graph<Long, Long, Long> graph = 
Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
-                TestGraphUtils.getLongLongEdgeData(env), env);
-
-        Graph<Long, Long, Long> result = 
graph.joinWithVertices(graph.getVertices().first(3)
-                        .map(new ProjectIdWithTrue()), new 
DoubleIfTrueMapper());
-
-        result.getVertices().writeAsCsv(resultPath);
-        env.execute();
-
-        expectedResult = "1,2\n" +
-                       "2,4\n" +
-                       "3,6\n" +
-                       "4,4\n" +
-                       "5,5\n";
-    }
-
-       @Test
-       public void testWithDifferentKeys() throws Exception {
-               /*
-                * Test joinWithVertices with an input DataSet containing 
different keys than the vertex DataSet
-                * - the iterator becomes empty.
-                */
-        final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-
-        Graph<Long, Long, Long> graph = 
Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
-                TestGraphUtils.getLongLongEdgeData(env), env);
-
-        Graph<Long, Long, Long> result = 
graph.joinWithVertices(TestGraphUtils.getLongLongTuple2Data(env),
-                new ProjectSecondMapper());
-
-        result.getVertices().writeAsCsv(resultPath);
-        env.execute();
-
-        expectedResult = "1,10\n" +
-                       "2,20\n" +
-                       "3,30\n" +
-                       "4,40\n" +
-                       "5,5\n";
-    }
-
-       @Test
-       public void testWithCustomType() throws Exception {
-               /*
-                * Test joinWithVertices with a DataSet containing custom 
parametrised type input values
-                */
-        final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-
-        Graph<Long, Long, Long> graph = 
Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
-                TestGraphUtils.getLongLongEdgeData(env), env);
-
-        Graph<Long, Long, Long> result = 
graph.joinWithVertices(TestGraphUtils.getLongCustomTuple2Data(env),
-                new CustomValueMapper());
-
-        result.getVertices().writeAsCsv(resultPath);
-        env.execute();
-
-        expectedResult = "1,10\n" +
-                       "2,20\n" +
-                       "3,30\n" +
-                       "4,40\n" +
-                       "5,5\n";
-    }
-
-       @SuppressWarnings("serial")
-       private static final class AddValuesMapper implements 
MapFunction<Tuple2<Long, Long>, Long> {
-               public Long map(Tuple2<Long, Long> tuple) throws Exception {
-                       return tuple.f0 + tuple.f1;
-               }
-       }
-
-       @SuppressWarnings("serial")
-       private static final class ProjectIdWithTrue implements 
MapFunction<Vertex<Long, Long>, Tuple2<Long, Boolean>> {
-        public Tuple2<Long, Boolean> map(Vertex<Long, Long> vertex) throws 
Exception {
-            return new Tuple2<Long, Boolean>(vertex.getId(), true);
-        }
-    }
-
-       @SuppressWarnings("serial")
-       private static final class DoubleIfTrueMapper implements 
MapFunction<Tuple2<Long, Boolean>, Long> {
-        public Long map(Tuple2<Long, Boolean> tuple) throws Exception {
-            if(tuple.f1) {
-                return tuple.f0 * 2;
-            }
-            else {
-                return tuple.f0;
-            }
-        }
-    }
-
-       @SuppressWarnings("serial")
-       private static final class ProjectSecondMapper implements 
MapFunction<Tuple2<Long, Long>, Long> {
-        public Long map(Tuple2<Long, Long> tuple) throws Exception {
-            return tuple.f1;
-        }
-    }
-
-       @SuppressWarnings("serial")
-       private static final class CustomValueMapper implements 
MapFunction<Tuple2<Long, DummyCustomParameterizedType<Float>>, Long> {
-        public Long map(Tuple2<Long, DummyCustomParameterizedType<Float>> 
tuple) throws Exception {
-            return (long) tuple.f1.getIntField();
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/6296c394/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestMapEdges.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestMapEdges.java
 
b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestMapEdges.java
deleted file mode 100644
index 9eccecc..0000000
--- 
a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestMapEdges.java
+++ /dev/null
@@ -1,223 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.test;
-
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.tuple.Tuple1;
-import org.apache.flink.graph.Edge;
-import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.test.TestGraphUtils.DummyCustomParameterizedType;
-import org.apache.flink.graph.test.TestGraphUtils.DummyCustomType;
-import org.apache.flink.test.util.MultipleProgramsTestBase;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-@RunWith(Parameterized.class)
-public class TestMapEdges extends MultipleProgramsTestBase {
-
-       public TestMapEdges(MultipleProgramsTestBase.ExecutionMode mode){
-               super(mode);
-       }
-
-    private String resultPath;
-    private String expectedResult;
-
-    @Rule
-       public TemporaryFolder tempFolder = new TemporaryFolder();
-
-       @Before
-       public void before() throws Exception{
-               resultPath = tempFolder.newFile().toURI().toString();
-       }
-
-       @After
-       public void after() throws Exception{
-               compareResultsByLinesInMemory(expectedResult, resultPath);
-       }
-
-       @Test
-       public void testWithSameValue() throws Exception {
-               /*
-                * Test mapEdges() keeping the same value type
-                */
-               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-               
-               Graph<Long, Long, Long> graph = 
Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
-                               TestGraphUtils.getLongLongEdgeData(env), env);
-               
-               DataSet<Edge<Long, Long>> mappedEdges = graph.mapEdges(new 
AddOneMapper()).getEdges();
-               
-               mappedEdges.writeAsCsv(resultPath);
-               env.execute();
-               expectedResult = "1,2,13\n" +
-                               "1,3,14\n" +
-                               "2,3,24\n" +
-                               "3,4,35\n" +
-                               "3,5,36\n" + 
-                               "4,5,46\n" + 
-                               "5,1,52\n";
-       }
-
-       @Test
-       public void testWithStringValue() throws Exception {
-               /*
-                * Test mapEdges() and change the value type to String
-                */
-               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-               
-               Graph<Long, Long, Long> graph = 
Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
-                               TestGraphUtils.getLongLongEdgeData(env), env);
-               
-               DataSet<Edge<Long, String>> mappedEdges = graph.mapEdges(new 
ToStringMapper()).getEdges();
-               
-               mappedEdges.writeAsCsv(resultPath);
-               env.execute();
-               expectedResult = "1,2,string(12)\n" +
-                               "1,3,string(13)\n" +
-                               "2,3,string(23)\n" +
-                               "3,4,string(34)\n" +
-                               "3,5,string(35)\n" + 
-                               "4,5,string(45)\n" + 
-                               "5,1,string(51)\n";
-       }
-
-       @Test
-       public void testWithTuple1Type() throws Exception {
-               /*
-                * Test mapEdges() and change the value type to a Tuple1
-                */
-               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-               
-               Graph<Long, Long, Long> graph = 
Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
-                               TestGraphUtils.getLongLongEdgeData(env), env);
-               
-               DataSet<Edge<Long, Tuple1<Long>>> mappedEdges = 
graph.mapEdges(new ToTuple1Mapper()).getEdges();
-               
-               mappedEdges.writeAsCsv(resultPath);
-               env.execute();
-
-               expectedResult = "1,2,(12)\n" +
-                               "1,3,(13)\n" +
-                               "2,3,(23)\n" +
-                               "3,4,(34)\n" +
-                               "3,5,(35)\n" + 
-                               "4,5,(45)\n" + 
-                               "5,1,(51)\n";
-       }
-
-       @Test
-       public void testWithCustomType() throws Exception {
-               /*
-                * Test mapEdges() and change the value type to a custom type
-                */
-               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-               
-               Graph<Long, Long, Long> graph = 
Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
-                               TestGraphUtils.getLongLongEdgeData(env), env);
-               
-               DataSet<Edge<Long, DummyCustomType>> mappedEdges = 
graph.mapEdges(new ToCustomTypeMapper()).getEdges();
-               
-               mappedEdges.writeAsCsv(resultPath);
-               env.execute();
-
-               expectedResult = "1,2,(T,12)\n" +
-                       "1,3,(T,13)\n" +
-                       "2,3,(T,23)\n" +
-                       "3,4,(T,34)\n" +
-                       "3,5,(T,35)\n" + 
-                       "4,5,(T,45)\n" + 
-                       "5,1,(T,51)\n";
-       }
-
-       @Test
-       public void testWithParametrizedCustomType() throws Exception {
-               /*
-                * Test mapEdges() and change the value type to a parameterized 
custom type
-                */
-               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-               
-               Graph<Long, Long, Long> graph = 
Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
-                               TestGraphUtils.getLongLongEdgeData(env), env);
-               
-               DataSet<Edge<Long, DummyCustomParameterizedType<Double>>> 
mappedEdges = graph.mapEdges(
-                               new 
ToCustomParametrizedTypeMapper()).getEdges();
-               
-               mappedEdges.writeAsCsv(resultPath);
-               env.execute();
-       
-               expectedResult = "1,2,(12.0,12)\n" +
-                       "1,3,(13.0,13)\n" +
-                       "2,3,(23.0,23)\n" +
-                       "3,4,(34.0,34)\n" +
-                       "3,5,(35.0,35)\n" + 
-                       "4,5,(45.0,45)\n" + 
-                       "5,1,(51.0,51)\n";
-       }
-
-       @SuppressWarnings("serial")
-       private static final class AddOneMapper implements 
MapFunction<Edge<Long, Long>, Long> {
-               public Long map(Edge<Long, Long> edge) throws Exception {
-                       return edge.getValue()+1;
-               }
-       }
-
-       @SuppressWarnings("serial")
-       private static final class ToStringMapper implements 
MapFunction<Edge<Long, Long>, String> {
-               public String map(Edge<Long, Long> edge) throws Exception {
-                       return String.format("string(%d)", edge.getValue());
-               }
-       }
-
-       @SuppressWarnings("serial")
-       private static final class ToTuple1Mapper implements 
MapFunction<Edge<Long, Long>, Tuple1<Long>> {
-               public Tuple1<Long> map(Edge<Long, Long> edge) throws Exception 
{
-                       Tuple1<Long> tupleValue = new Tuple1<Long>();
-                       tupleValue.setFields(edge.getValue());
-                       return tupleValue;
-               }
-       }
-
-       @SuppressWarnings("serial")
-       private static final class ToCustomTypeMapper implements 
MapFunction<Edge<Long, Long>, DummyCustomType> {
-               public DummyCustomType map(Edge<Long, Long> edge) throws 
Exception {
-                       DummyCustomType dummyValue = new DummyCustomType();
-                       dummyValue.setIntField(edge.getValue().intValue());     
                                        
-                       return dummyValue;
-               }
-       }
-
-       @SuppressWarnings("serial")
-       private static final class ToCustomParametrizedTypeMapper implements 
MapFunction<Edge<Long, Long>, 
-               DummyCustomParameterizedType<Double>> {
-
-               public DummyCustomParameterizedType<Double> map(Edge<Long, 
Long> edge) throws Exception {
-                       DummyCustomParameterizedType<Double> dummyValue = new 
DummyCustomParameterizedType<Double>();
-                       dummyValue.setIntField(edge.getValue().intValue());
-                       dummyValue.setTField(new Double(edge.getValue()));      
                                        
-                       return dummyValue;
-               }
-       }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/6296c394/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestMapVertices.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestMapVertices.java
 
b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestMapVertices.java
deleted file mode 100644
index c4d44b0..0000000
--- 
a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestMapVertices.java
+++ /dev/null
@@ -1,233 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.test;
-
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.tuple.Tuple1;
-import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.Vertex;
-import org.apache.flink.graph.test.TestGraphUtils.DummyCustomParameterizedType;
-import org.apache.flink.graph.test.TestGraphUtils.DummyCustomType;
-import org.apache.flink.test.util.MultipleProgramsTestBase;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-@RunWith(Parameterized.class)
-public class TestMapVertices extends MultipleProgramsTestBase {
-
-       public TestMapVertices(MultipleProgramsTestBase.ExecutionMode mode){
-               super(mode);
-       }
-
-    private String resultPath;
-    private String expectedResult;
-
-    @Rule
-       public TemporaryFolder tempFolder = new TemporaryFolder();
-
-       @Before
-       public void before() throws Exception{
-               resultPath = tempFolder.newFile().toURI().toString();
-       }
-
-       @After
-       public void after() throws Exception{
-               compareResultsByLinesInMemory(expectedResult, resultPath);
-       }
-
-       @Test
-       public void testWithSameValue() throws Exception {
-               /*
-                * Test mapVertices() keeping the same value type
-                */
-               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-               
-               Graph<Long, Long, Long> graph = 
Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
-                               TestGraphUtils.getLongLongEdgeData(env), env);
-               
-               DataSet<Vertex<Long, Long>> mappedVertices = 
graph.mapVertices(new AddOneMapper()).getVertices();
-               
-               mappedVertices.writeAsCsv(resultPath);
-               env.execute();
-               expectedResult = "1,2\n" +
-                       "2,3\n" +
-                       "3,4\n" +
-                       "4,5\n" +
-                       "5,6\n";
-       }
-
-       @Test
-       public void testWithStringValue() throws Exception {
-               /*
-                * Test mapVertices() and change the value type to String
-                */
-               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-               
-               Graph<Long, Long, Long> graph = 
Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
-                               TestGraphUtils.getLongLongEdgeData(env), env);
-               
-               DataSet<Vertex<Long, String>> mappedVertices = 
graph.mapVertices(new ToStringMapper()).getVertices();
-               
-               mappedVertices.writeAsCsv(resultPath);
-               env.execute();
-
-               expectedResult = "1,one\n" +
-                       "2,two\n" +
-                       "3,three\n" +
-                       "4,four\n" +
-                       "5,five\n";
-       }
-
-       @Test
-       public void testWithtuple1Value() throws Exception {
-               /*
-                * Test mapVertices() and change the value type to a Tuple1
-                */
-               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-               
-               Graph<Long, Long, Long> graph = 
Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
-                               TestGraphUtils.getLongLongEdgeData(env), env);
-               
-               DataSet<Vertex<Long, Tuple1<Long>>> mappedVertices = 
graph.mapVertices(new ToTuple1Mapper()).getVertices();
-               
-               mappedVertices.writeAsCsv(resultPath);
-               env.execute();
-
-               expectedResult = "1,(1)\n" +
-                       "2,(2)\n" +
-                       "3,(3)\n" +
-                       "4,(4)\n" +
-                       "5,(5)\n";
-       }
-
-       @Test
-       public void testWithCustomType() throws Exception {
-               /*
-                * Test mapVertices() and change the value type to a custom type
-                */
-               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-               
-               Graph<Long, Long, Long> graph = 
Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
-                               TestGraphUtils.getLongLongEdgeData(env), env);
-               
-               DataSet<Vertex<Long, DummyCustomType>> mappedVertices = 
graph.mapVertices(new ToCustomTypeMapper()).getVertices();
-               
-               mappedVertices.writeAsCsv(resultPath);
-               env.execute();
-
-               expectedResult = "1,(T,1)\n" +
-                       "2,(T,2)\n" +
-                       "3,(T,3)\n" +
-                       "4,(T,4)\n" +
-                       "5,(T,5)\n";
-       }
-
-       @Test
-       public void testWithCustomParametrizedType() throws Exception {
-               /*
-                * Test mapVertices() and change the value type to a 
parameterized custom type
-                */
-               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-               
-               Graph<Long, Long, Long> graph = 
Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
-                               TestGraphUtils.getLongLongEdgeData(env), env);
-               
-               DataSet<Vertex<Long, DummyCustomParameterizedType<Double>>> 
mappedVertices = graph.mapVertices(
-                               new 
ToCustomParametrizedTypeMapper()).getVertices();
-               
-               mappedVertices.writeAsCsv(resultPath);
-               env.execute();
-       
-               expectedResult = "1,(1.0,1)\n" +
-                       "2,(2.0,2)\n" +
-                       "3,(3.0,3)\n" +
-                       "4,(4.0,4)\n" +
-                       "5,(5.0,5)\n";
-       }
-
-       @SuppressWarnings("serial")
-       private static final class AddOneMapper implements 
MapFunction<Vertex<Long, Long>, Long> {
-               public Long map(Vertex<Long, Long> value) throws Exception {
-                       return value.getValue()+1;
-               }
-       }
-
-       @SuppressWarnings("serial")
-       private static final class ToStringMapper implements 
MapFunction<Vertex<Long, Long>, String> {
-               public String map(Vertex<Long, Long> vertex) throws Exception {
-                       String stringValue;
-                       if (vertex.getValue() == 1) {
-                               stringValue = "one";
-                       }
-                       else if (vertex.getValue() == 2) {
-                               stringValue = "two";
-                       }
-                       else if (vertex.getValue() == 3) {
-                               stringValue = "three";
-                       }
-                       else if (vertex.getValue() == 4) {
-                               stringValue = "four";
-                       }
-                       else if (vertex.getValue() == 5) {
-                               stringValue = "five";
-                       }
-                       else {
-                               stringValue = "";
-                       }
-                       return stringValue;
-               }
-       }
-
-       @SuppressWarnings("serial")
-       private static final class ToTuple1Mapper implements 
MapFunction<Vertex<Long, Long>, Tuple1<Long>> {
-               public Tuple1<Long> map(Vertex<Long, Long> vertex) throws 
Exception {
-                       Tuple1<Long> tupleValue = new Tuple1<Long>();
-                       tupleValue.setFields(vertex.getValue());
-                       return tupleValue;
-               }
-       }
-
-       @SuppressWarnings("serial")
-       private static final class ToCustomTypeMapper implements 
MapFunction<Vertex<Long, Long>, DummyCustomType> {
-               public DummyCustomType map(Vertex<Long, Long> vertex) throws 
Exception {
-                       DummyCustomType dummyValue = new DummyCustomType();
-                       dummyValue.setIntField(vertex.getValue().intValue());   
                                        
-                       return dummyValue;
-               }
-       }
-
-       @SuppressWarnings("serial")
-       private static final class ToCustomParametrizedTypeMapper implements 
MapFunction<Vertex<Long, Long>, 
-               DummyCustomParameterizedType<Double>> {
-               
-               public DummyCustomParameterizedType<Double> map(Vertex<Long, 
Long> vertex) throws Exception {
-                       DummyCustomParameterizedType<Double> dummyValue = new 
DummyCustomParameterizedType<Double>();
-                       dummyValue.setIntField(vertex.getValue().intValue());
-                       dummyValue.setTField(new Double(vertex.getValue()));    
                                        
-                       return dummyValue;
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/6296c394/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestReduceOnEdgesMethods.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestReduceOnEdgesMethods.java
 
b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestReduceOnEdgesMethods.java
deleted file mode 100644
index 7a02ffe..0000000
--- 
a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestReduceOnEdgesMethods.java
+++ /dev/null
@@ -1,317 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.test;
-
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.graph.Edge;
-import org.apache.flink.graph.EdgeDirection;
-import org.apache.flink.graph.EdgesFunction;
-import org.apache.flink.graph.EdgesFunctionWithVertexValue;
-import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.Vertex;
-import org.apache.flink.test.util.MultipleProgramsTestBase;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-@RunWith(Parameterized.class)
-public class TestReduceOnEdgesMethods extends MultipleProgramsTestBase {
-
-       public TestReduceOnEdgesMethods(MultipleProgramsTestBase.ExecutionMode 
mode){
-               super(mode);
-       }
-
-    private String resultPath;
-    private String expectedResult;
-
-    @Rule
-       public TemporaryFolder tempFolder = new TemporaryFolder();
-
-       @Before
-       public void before() throws Exception{
-               resultPath = tempFolder.newFile().toURI().toString();
-       }
-
-       @After
-       public void after() throws Exception{
-               compareResultsByLinesInMemory(expectedResult, resultPath);
-       }
-
-       @Test
-       public void testLowestWeightOutNeighbor() throws Exception {
-               /*
-                * Get the lowest-weight out-neighbor
-                * for each vertex
-         */
-               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-               Graph<Long, Long, Long> graph = 
Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), 
-                               TestGraphUtils.getLongLongEdgeData(env), env);
-
-               DataSet<Tuple2<Long, Long>> verticesWithLowestOutNeighbor = 
-                               graph.reduceOnEdges(new 
SelectMinWeightNeighbor(), EdgeDirection.OUT);
-               verticesWithLowestOutNeighbor.writeAsCsv(resultPath);
-               env.execute();
-       
-               expectedResult = "1,2\n" +
-                               "2,3\n" + 
-                               "3,4\n" +
-                               "4,5\n" + 
-                               "5,1\n";
-       }
-
-       @Test
-       public void testLowestWeightInNeighbor() throws Exception {
-               /*
-                * Get the lowest-weight in-neighbor
-                * for each vertex
-         */
-               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-               Graph<Long, Long, Long> graph = 
Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), 
-                               TestGraphUtils.getLongLongEdgeData(env), env);
-
-               DataSet<Tuple2<Long, Long>> verticesWithLowestOutNeighbor = 
-                               graph.reduceOnEdges(new 
SelectMinWeightInNeighbor(), EdgeDirection.IN);
-               verticesWithLowestOutNeighbor.writeAsCsv(resultPath);
-               env.execute();
-
-               expectedResult = "1,5\n" +
-                                       "2,1\n" + 
-                                       "3,1\n" +
-                                       "4,3\n" + 
-                                       "5,3\n";
-       }
-
-       @Test
-       public void testMaxWeightEdge() throws Exception {
-               /*
-                * Get the maximum weight among all edges
-                * of a vertex
-         */
-               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-               Graph<Long, Long, Long> graph = 
Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), 
-                               TestGraphUtils.getLongLongEdgeData(env), env);
-
-               DataSet<Tuple2<Long, Long>> verticesWithMaxEdgeWeight = 
-                               graph.reduceOnEdges(new 
SelectMaxWeightNeighbor(), EdgeDirection.ALL);
-               verticesWithMaxEdgeWeight.writeAsCsv(resultPath);
-               env.execute();
-
-               expectedResult = "1,51\n" +
-                               "2,23\n" + 
-                               "3,35\n" +
-                               "4,45\n" + 
-                               "5,51\n";
-       }
-
-       @Test
-       public void testLowestWeightOutNeighborNoValue() throws Exception {
-               /*
-                * Get the lowest-weight out-neighbor
-                * for each vertex
-         */
-               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-               Graph<Long, Long, Long> graph = 
Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), 
-                               TestGraphUtils.getLongLongEdgeData(env), env);
-
-               DataSet<Tuple2<Long, Long>> verticesWithLowestOutNeighbor = 
-                               graph.reduceOnEdges(new 
SelectMinWeightNeighborNoValue(), EdgeDirection.OUT);
-               verticesWithLowestOutNeighbor.writeAsCsv(resultPath);
-               env.execute();
-
-               expectedResult = "1,2\n" +
-                               "2,3\n" + 
-                               "3,4\n" +
-                               "4,5\n" + 
-                               "5,1\n";
-       }
-
-       @Test
-       public void testLowestWeightInNeighborNoValue() throws Exception {
-               /*
-                * Get the lowest-weight in-neighbor
-                * for each vertex
-         */
-               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-               Graph<Long, Long, Long> graph = 
Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), 
-                               TestGraphUtils.getLongLongEdgeData(env), env);
-
-               DataSet<Tuple2<Long, Long>> verticesWithLowestOutNeighbor = 
-                               graph.reduceOnEdges(new 
SelectMinWeightInNeighborNoValue(), EdgeDirection.IN);
-               verticesWithLowestOutNeighbor.writeAsCsv(resultPath);
-               env.execute();
-
-               expectedResult = "1,5\n" +
-                               "2,1\n" + 
-                               "3,1\n" +
-                               "4,3\n" + 
-                               "5,3\n";
-       }
-
-       @Test
-       public void testMaxWeightAllNeighbors() throws Exception {
-               /*
-                * Get the maximum weight among all edges
-                * of a vertex
-         */
-               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-               Graph<Long, Long, Long> graph = 
Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), 
-                               TestGraphUtils.getLongLongEdgeData(env), env);
-
-               DataSet<Tuple2<Long, Long>> verticesWithMaxEdgeWeight = 
-                               graph.reduceOnEdges(new 
SelectMaxWeightNeighborNoValue(), EdgeDirection.ALL);
-               verticesWithMaxEdgeWeight.writeAsCsv(resultPath);
-               env.execute();
-
-               expectedResult = "1,51\n" +
-                               "2,23\n" + 
-                               "3,35\n" +
-                               "4,45\n" + 
-                               "5,51\n";
-       }
-
-       @SuppressWarnings("serial")
-       private static final class SelectMinWeightNeighbor implements 
EdgesFunctionWithVertexValue<Long, Long, Long, Tuple2<Long, Long>> {
-
-               public Tuple2<Long, Long> iterateEdges(
-                               Vertex<Long, Long> v,
-                               Iterable<Edge<Long, Long>> edges) {
-                       
-                       long weight = Long.MAX_VALUE;
-                       long minNeighorId = 0;
-                       
-                       for (Edge<Long, Long> edge: edges) {
-                               if (edge.getValue() < weight) {
-                                       weight = edge.getValue();
-                                       minNeighorId = edge.getTarget();
-                               }
-                       }
-                       return new Tuple2<Long, Long>(v.getId(), minNeighorId);
-               }
-       }
-
-       @SuppressWarnings("serial")
-       private static final class SelectMaxWeightNeighbor implements 
EdgesFunctionWithVertexValue<Long, Long, Long, Tuple2<Long, Long>> {
-
-               public Tuple2<Long, Long> iterateEdges(Vertex<Long, Long> v,
-                               Iterable<Edge<Long, Long>> edges) {
-                       
-                       long weight = Long.MIN_VALUE;
-
-                       for (Edge<Long, Long> edge: edges) {
-                               if (edge.getValue() > weight) {
-                                       weight = edge.getValue();
-                               }
-                       }
-                       return new Tuple2<Long, Long>(v.getId(), weight);
-               }
-       }
-
-       @SuppressWarnings("serial")
-       private static final class SelectMinWeightNeighborNoValue implements 
EdgesFunction<Long, Long, Tuple2<Long, Long>> {
-
-               public Tuple2<Long, Long> iterateEdges(Iterable<Tuple2<Long, 
Edge<Long, Long>>> edges) {
-
-                       long weight = Long.MAX_VALUE;
-                       long minNeighorId = 0;
-                       long vertexId = -1;
-                       long i=0;
-
-                       for (Tuple2<Long, Edge<Long, Long>> edge: edges) {
-                               if (edge.f1.getValue() < weight) {
-                                       weight = edge.f1.getValue();
-                                       minNeighorId = edge.f1.getTarget();
-                               }
-                               if (i==0) {
-                                       vertexId = edge.f0;
-                               } i++;
-                       }
-                       return new Tuple2<Long, Long>(vertexId, minNeighorId);
-               }
-       }
-
-       @SuppressWarnings("serial")
-       private static final class SelectMaxWeightNeighborNoValue implements 
EdgesFunction<Long, Long, Tuple2<Long, Long>> {
-
-               public Tuple2<Long, Long> iterateEdges(Iterable<Tuple2<Long, 
Edge<Long, Long>>> edges) {
-                       
-                       long weight = Long.MIN_VALUE;
-                       long vertexId = -1;
-                       long i=0;
-
-                       for (Tuple2<Long, Edge<Long, Long>> edge: edges) {
-                               if (edge.f1.getValue() > weight) {
-                                       weight = edge.f1.getValue();
-                               }
-                               if (i==0) {
-                                       vertexId = edge.f0;
-                               } i++;
-                       }
-                       return new Tuple2<Long, Long>(vertexId, weight);
-               }
-       }
-
-       @SuppressWarnings("serial")
-       private static final class SelectMinWeightInNeighbor implements 
EdgesFunctionWithVertexValue<Long, Long, Long, Tuple2<Long, Long>> {
-
-               public Tuple2<Long, Long> iterateEdges(
-                               Vertex<Long, Long> v,
-                               Iterable<Edge<Long, Long>> edges) {
-                       
-                       long weight = Long.MAX_VALUE;
-                       long minNeighorId = 0;
-                       
-                       for (Edge<Long, Long> edge: edges) {
-                               if (edge.getValue() < weight) {
-                                       weight = edge.getValue();
-                                       minNeighorId = edge.getSource();
-                               }
-                       }
-                       return new Tuple2<Long, Long>(v.getId(), minNeighorId);
-               }
-       }
-
-       @SuppressWarnings("serial")
-       private static final class SelectMinWeightInNeighborNoValue implements 
EdgesFunction<Long, Long, Tuple2<Long, Long>> {
-
-               public Tuple2<Long, Long> iterateEdges(Iterable<Tuple2<Long, 
Edge<Long, Long>>> edges) {
-                       
-                       long weight = Long.MAX_VALUE;
-                       long minNeighorId = 0;
-                       long vertexId = -1;
-                       long i=0;
-
-                       for (Tuple2<Long, Edge<Long, Long>> edge: edges) {
-                               if (edge.f1.getValue() < weight) {
-                                       weight = edge.f1.getValue();
-                                       minNeighorId = edge.f1.getSource();
-                               }
-                               if (i==0) {
-                                       vertexId = edge.f0;
-                               } i++;
-                       }
-                       return new Tuple2<Long, Long>(vertexId, minNeighorId);
-               }
-       }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/6296c394/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestReduceOnNeighborMethods.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestReduceOnNeighborMethods.java
 
b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestReduceOnNeighborMethods.java
deleted file mode 100644
index e64eacf..0000000
--- 
a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestReduceOnNeighborMethods.java
+++ /dev/null
@@ -1,303 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.test;
-
-import java.util.Iterator;
-
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.graph.Edge;
-import org.apache.flink.graph.EdgeDirection;
-import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.NeighborsFunction;
-import org.apache.flink.graph.NeighborsFunctionWithVertexValue;
-import org.apache.flink.graph.Vertex;
-import org.apache.flink.test.util.MultipleProgramsTestBase;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-@RunWith(Parameterized.class)
-public class TestReduceOnNeighborMethods extends MultipleProgramsTestBase {
-
-       public 
TestReduceOnNeighborMethods(MultipleProgramsTestBase.ExecutionMode mode){
-               super(mode);
-       }
-
-    private String resultPath;
-    private String expectedResult;
-
-    @Rule
-       public TemporaryFolder tempFolder = new TemporaryFolder();
-
-       @Before
-       public void before() throws Exception{
-               resultPath = tempFolder.newFile().toURI().toString();
-       }
-
-       @After
-       public void after() throws Exception{
-               compareResultsByLinesInMemory(expectedResult, resultPath);
-       }
-
-       @Test
-       public void testSumOfOutNeighbors() throws Exception {
-               /*
-                * Get the sum of out-neighbor values
-                * for each vertex
-         */
-               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-               Graph<Long, Long, Long> graph = 
Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), 
-                               TestGraphUtils.getLongLongEdgeData(env), env);
-
-               DataSet<Tuple2<Long, Long>> verticesWithSumOfOutNeighborValues 
= 
-                               graph.reduceOnNeighbors(new SumOutNeighbors(), 
EdgeDirection.OUT);
-
-               verticesWithSumOfOutNeighborValues.writeAsCsv(resultPath);
-               env.execute();
-               expectedResult = "1,5\n" +
-                               "2,3\n" + 
-                               "3,9\n" +
-                               "4,5\n" + 
-                               "5,1\n";
-       }
-
-       @Test
-       public void testSumOfInNeighbors() throws Exception {
-               /*
-                * Get the sum of in-neighbor values
-                * times the edge weights for each vertex
-         */
-               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-               Graph<Long, Long, Long> graph = 
Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), 
-                               TestGraphUtils.getLongLongEdgeData(env), env);
-
-               DataSet<Tuple2<Long, Long>> verticesWithSum = 
-                               graph.reduceOnNeighbors(new SumInNeighbors(), 
EdgeDirection.IN);                
-
-               verticesWithSum.writeAsCsv(resultPath);
-               env.execute();
-               expectedResult = "1,255\n" +
-                               "2,12\n" + 
-                               "3,59\n" +
-                               "4,102\n" + 
-                               "5,285\n";
-       }
-
-       @Test
-       public void testSumOfOAllNeighbors() throws Exception {
-               /*
-                * Get the sum of all neighbor values
-                * including own vertex value
-                * for each vertex
-         */
-               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-               Graph<Long, Long, Long> graph = 
Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), 
-                               TestGraphUtils.getLongLongEdgeData(env), env);
-
-               DataSet<Tuple2<Long, Long>> verticesWithSumOfOutNeighborValues 
= 
-                               graph.reduceOnNeighbors(new SumAllNeighbors(), 
EdgeDirection.ALL);
-
-               verticesWithSumOfOutNeighborValues.writeAsCsv(resultPath);
-               env.execute();
-
-               expectedResult = "1,11\n" +
-                               "2,6\n" + 
-                               "3,15\n" +
-                               "4,12\n" + 
-                               "5,13\n";
-       }
-
-       @Test
-       public void testSumOfOutNeighborsNoValue() throws Exception {
-               /*
-                * Get the sum of out-neighbor values
-                * for each vertex
-         */
-               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-               Graph<Long, Long, Long> graph = 
Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), 
-                               TestGraphUtils.getLongLongEdgeData(env), env);
-
-               DataSet<Tuple2<Long, Long>> verticesWithSumOfOutNeighborValues 
= 
-                               graph.reduceOnNeighbors(new 
SumOutNeighborsNoValue(), EdgeDirection.OUT);
-
-               verticesWithSumOfOutNeighborValues.writeAsCsv(resultPath);
-               env.execute();
-
-               expectedResult = "1,5\n" +
-                               "2,3\n" + 
-                               "3,9\n" +
-                               "4,5\n" + 
-                               "5,1\n";
-       }
-
-       @Test
-       public void testSumOfInNeighborsNoValue() throws Exception {
-               /*
-                * Get the sum of in-neighbor values
-                * times the edge weights for each vertex
-         */
-               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-               Graph<Long, Long, Long> graph = 
Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), 
-                               TestGraphUtils.getLongLongEdgeData(env), env);
-
-               DataSet<Tuple2<Long, Long>> verticesWithSum = 
-                               graph.reduceOnNeighbors(new 
SumInNeighborsNoValue(), EdgeDirection.IN);
-
-               verticesWithSum.writeAsCsv(resultPath);
-               env.execute();
-       
-               expectedResult = "1,255\n" +
-                               "2,12\n" + 
-                               "3,59\n" +
-                               "4,102\n" + 
-                               "5,285\n";
-       }
-
-       @Test
-       public void testSumOfAllNeighborsNoValue() throws Exception {
-               /*
-                * Get the sum of all neighbor values
-                * for each vertex
-         */
-               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-               Graph<Long, Long, Long> graph = 
Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), 
-                               TestGraphUtils.getLongLongEdgeData(env), env);
-
-               DataSet<Tuple2<Long, Long>> verticesWithSumOfOutNeighborValues 
= 
-                               graph.reduceOnNeighbors(new 
SumAllNeighborsNoValue(), EdgeDirection.ALL);
-
-               verticesWithSumOfOutNeighborValues.writeAsCsv(resultPath);
-               env.execute();
-       
-               expectedResult = "1,10\n" +
-                               "2,4\n" + 
-                               "3,12\n" +
-                               "4,8\n" + 
-                               "5,8\n";
-       }
-
-       @SuppressWarnings("serial")
-       private static final class SumOutNeighbors implements 
NeighborsFunctionWithVertexValue<Long, Long, Long, 
-       Tuple2<Long, Long>> {
-
-               public Tuple2<Long, Long> iterateNeighbors(Vertex<Long, Long> 
vertex,
-                               Iterable<Tuple2<Edge<Long, Long>, Vertex<Long, 
Long>>> neighbors) {
-                       
-                       long sum = 0;
-                       for (Tuple2<Edge<Long, Long>, Vertex<Long, Long>> 
neighbor : neighbors) {
-                               sum += neighbor.f1.getValue();
-                       }
-                       return new Tuple2<Long, Long>(vertex.getId(), sum);
-               }
-       }
-
-       @SuppressWarnings("serial")
-       private static final class SumInNeighbors implements 
NeighborsFunctionWithVertexValue<Long, Long, Long, 
-               Tuple2<Long, Long>> {
-               
-               public Tuple2<Long, Long> iterateNeighbors(Vertex<Long, Long> 
vertex,
-                               Iterable<Tuple2<Edge<Long, Long>, Vertex<Long, 
Long>>> neighbors) {
-               
-                       long sum = 0;
-                       for (Tuple2<Edge<Long, Long>, Vertex<Long, Long>> 
neighbor : neighbors) {
-                               sum += neighbor.f0.getValue() * 
neighbor.f1.getValue();
-                       }
-                       return new Tuple2<Long, Long>(vertex.getId(), sum);
-               }
-       }
-
-       @SuppressWarnings("serial")
-       private static final class SumAllNeighbors implements 
NeighborsFunctionWithVertexValue<Long, Long, Long, 
-               Tuple2<Long, Long>> {
-
-               public Tuple2<Long, Long> iterateNeighbors(Vertex<Long, Long> 
vertex,
-               Iterable<Tuple2<Edge<Long, Long>, Vertex<Long, Long>>> 
neighbors) {
-       
-                       long sum = 0;
-                       for (Tuple2<Edge<Long, Long>, Vertex<Long, Long>> 
neighbor : neighbors) {
-                               sum += neighbor.f1.getValue();
-                       }
-                       return new Tuple2<Long, Long>(vertex.getId(), sum + 
vertex.getValue());
-               }
-       }
-
-       @SuppressWarnings("serial")
-       private static final class SumOutNeighborsNoValue implements 
NeighborsFunction<Long, Long, Long, 
-               Tuple2<Long, Long>> {
-
-               public Tuple2<Long, Long> iterateNeighbors(
-                               Iterable<Tuple3<Long, Edge<Long, Long>, 
Vertex<Long, Long>>> neighbors) {
-
-                       long sum = 0;
-                       Tuple3<Long, Edge<Long, Long>, Vertex<Long, Long>> next 
= null;
-                       Iterator<Tuple3<Long, Edge<Long, Long>, Vertex<Long, 
Long>>> neighborsIterator =
-                                       neighbors.iterator();
-                       while(neighborsIterator.hasNext()) {
-                               next = neighborsIterator.next();
-                               sum += next.f2.getValue();
-                       }
-                       return new Tuple2<Long, Long>(next.f0, sum);
-               }
-       }
-
-       @SuppressWarnings("serial")
-       private static final class SumInNeighborsNoValue implements 
NeighborsFunction<Long, Long, Long, 
-               Tuple2<Long, Long>> {
-               
-               public Tuple2<Long, Long> iterateNeighbors(
-                               Iterable<Tuple3<Long, Edge<Long, Long>, 
Vertex<Long, Long>>> neighbors) {
-               
-                       long sum = 0;
-                       Tuple3<Long, Edge<Long, Long>, Vertex<Long, Long>> next 
= null;
-                       Iterator<Tuple3<Long, Edge<Long, Long>, Vertex<Long, 
Long>>> neighborsIterator =
-                                       neighbors.iterator();
-                       while(neighborsIterator.hasNext()) {
-                               next = neighborsIterator.next();
-                               sum += next.f2.getValue() * next.f1.getValue();
-                       }
-                       return new Tuple2<Long, Long>(next.f0, sum);
-               }
-       }
-
-       @SuppressWarnings("serial")
-       private static final class SumAllNeighborsNoValue implements 
NeighborsFunction<Long, Long, Long, 
-               Tuple2<Long, Long>> {
-
-               public Tuple2<Long, Long> iterateNeighbors(
-                               Iterable<Tuple3<Long, Edge<Long, Long>, 
Vertex<Long, Long>>> neighbors) {
-       
-                       long sum = 0;
-                       Tuple3<Long, Edge<Long, Long>, Vertex<Long, Long>> next 
= null;
-                       Iterator<Tuple3<Long, Edge<Long, Long>, Vertex<Long, 
Long>>> neighborsIterator =
-                                       neighbors.iterator();
-                       while(neighborsIterator.hasNext()) {
-                               next = neighborsIterator.next();
-                               sum += next.f2.getValue();
-                       }
-                       return new Tuple2<Long, Long>(next.f0, sum);
-               }
-       }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/6296c394/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestVertexCentricConnectedComponents.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestVertexCentricConnectedComponents.java
 
b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestVertexCentricConnectedComponents.java
deleted file mode 100644
index 614ddd5..0000000
--- 
a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestVertexCentricConnectedComponents.java
+++ /dev/null
@@ -1,118 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.graph.test;
-
-import java.io.BufferedReader;
-
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.common.functions.RichMapFunction;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.graph.Edge;
-import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.Vertex;
-import org.apache.flink.graph.spargel.MessageIterator;
-import org.apache.flink.graph.spargel.MessagingFunction;
-import org.apache.flink.graph.spargel.VertexUpdateFunction;
-import org.apache.flink.test.testdata.ConnectedComponentsData;
-import org.apache.flink.test.util.JavaProgramTestBase;
-import org.apache.flink.types.NullValue;
-
-@SuppressWarnings("serial")
-public class TestVertexCentricConnectedComponents extends JavaProgramTestBase {
-
-       private static final long SEED = 9487520347802987L;
-       
-       private static final int NUM_VERTICES = 1000;
-       
-       private static final int NUM_EDGES = 10000;
-
-       private String resultPath;
-       
-       
-       @Override
-       protected void preSubmit() throws Exception {
-               resultPath = getTempFilePath("results");
-       }
-       
-       @Override
-       protected void testProgram() throws Exception {
-               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-               
-               DataSet<Long> vertexIds = env.generateSequence(1, NUM_VERTICES);
-               DataSet<String> edgeString = 
env.fromElements(ConnectedComponentsData.getRandomOddEvenEdges(NUM_EDGES, 
NUM_VERTICES, SEED).split("\n"));
-               
-               DataSet<Edge<Long, NullValue>> edges = edgeString.map(new 
EdgeParser());
-               
-               DataSet<Vertex<Long, Long>> initialVertices = vertexIds.map(new 
IdAssigner());
-               Graph<Long, Long, NullValue> graph = 
Graph.fromDataSet(initialVertices, edges, env); 
-               
-               Graph<Long, Long, NullValue> result = 
graph.runVertexCentricIteration(new CCUpdater(), new CCMessager(), 100);
-               
-               result.getVertices().writeAsCsv(resultPath, "\n", " ");
-               env.execute();
-       }
-       
-       public static final class CCUpdater extends VertexUpdateFunction<Long, 
Long, Long> {
-               @Override
-               public void updateVertex(Long vertexKey, Long vertexValue, 
MessageIterator<Long> inMessages) {
-                       long min = Long.MAX_VALUE;
-                       for (long msg : inMessages) {
-                               min = Math.min(min, msg);
-                       }
-                       if (min < vertexValue) {
-                               setNewVertexValue(min);
-                       }
-               }
-       }
-       
-       public static final class CCMessager extends MessagingFunction<Long, 
Long, Long, NullValue> {
-               @Override
-               public void sendMessages(Long vertexId, Long componentId) {
-                       sendMessageToAllNeighbors(componentId);
-               }
-       }
-       
-       /**
-        * A map function that takes a Long value and creates a 2-tuple out of 
it:
-        * <pre>(Long value) -> (value, value)</pre>
-        */
-       public static final class IdAssigner implements MapFunction<Long, 
Vertex<Long, Long>> {
-               @Override
-               public Vertex<Long, Long> map(Long value) {
-                       return new Vertex<Long, Long>(value, value);
-               }
-       }
-
-       @Override
-       protected void postSubmit() throws Exception {
-               for (BufferedReader reader : getResultReader(resultPath)) {
-                       ConnectedComponentsData.checkOddEvenResult(reader);
-               }
-       }
-       
-       public static final class EdgeParser extends RichMapFunction<String, 
Edge<Long, NullValue>> {
-               public Edge<Long, NullValue> map(String value) {
-                       String[] nums = value.split(" ");
-                       return new Edge<Long, 
NullValue>(Long.parseLong(nums[0]), Long.parseLong(nums[1]), 
-                                       NullValue.getInstance());
-               }
-       }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/6296c394/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestWeaklyConnected.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestWeaklyConnected.java
 
b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestWeaklyConnected.java
deleted file mode 100644
index f5b6d9d..0000000
--- 
a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestWeaklyConnected.java
+++ /dev/null
@@ -1,118 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.test;
-
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.graph.Graph;
-import org.apache.flink.test.util.MultipleProgramsTestBase;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-@RunWith(Parameterized.class)
-public class TestWeaklyConnected extends MultipleProgramsTestBase {
-
-       public TestWeaklyConnected(MultipleProgramsTestBase.ExecutionMode mode){
-               super(mode);
-       }
-
-    private String resultPath;
-    private String expectedResult;
-
-    @Rule
-       public TemporaryFolder tempFolder = new TemporaryFolder();
-
-       @Before
-       public void before() throws Exception{
-               resultPath = tempFolder.newFile().toURI().toString();
-       }
-
-       @After
-       public void after() throws Exception{
-               compareResultsByLinesInMemory(expectedResult, resultPath);
-       }
-
-       @Test
-       public void testWithConnectedDirected() throws Exception {
-               /*
-                * Test isWeaklyConnected() with a connected, directed graph
-                */
-               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-               
-               Graph<Long, Long, Long> graph = 
Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
-                               TestGraphUtils.getLongLongEdgeData(env), env);
-               
-               graph.isWeaklyConnected(10).writeAsText(resultPath);
-               
-               env.execute();
-               expectedResult = "true\n";
-       }
-
-       @Test
-       public void testWithDisconnectedDirected() throws Exception {
-               /*
-                * Test isWeaklyConnected() with a disconnected, directed graph
-                */
-               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-               
-               Graph<Long, Long, Long> graph = 
Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
-                               
TestGraphUtils.getDisconnectedLongLongEdgeData(env), env);
-               
-               graph.isWeaklyConnected(10).writeAsText(resultPath);
-               
-               env.execute();
-               expectedResult = "false\n";
-       }
-
-       @Test
-       public void testWithConnectedUndirected() throws Exception {
-               /*
-                * Test isWeaklyConnected() with a connected, undirected graph
-                */
-               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-               
-               Graph<Long, Long, Long> graph = 
Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
-                               TestGraphUtils.getLongLongEdgeData(env), 
env).getUndirected();
-               
-               graph.isWeaklyConnected(10).writeAsText(resultPath);
-               
-               env.execute();
-               expectedResult = "true\n";
-       }
-
-       @Test
-       public void testWithDisconnectedUndirected() throws Exception {
-               /*
-                * Test isWeaklyConnected() with a disconnected, undirected 
graph
-                */
-               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-               
-               Graph<Long, Long, Long> graph = 
Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
-                               
TestGraphUtils.getDisconnectedLongLongEdgeData(env), env).getUndirected();
-               
-               graph.isWeaklyConnected(10).writeAsText(resultPath);
-               
-               env.execute();
-               expectedResult = "false\n";
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/6296c394/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/VertexCentricConnectedComponentsITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/VertexCentricConnectedComponentsITCase.java
 
b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/VertexCentricConnectedComponentsITCase.java
new file mode 100644
index 0000000..380e027
--- /dev/null
+++ 
b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/VertexCentricConnectedComponentsITCase.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.graph.test;
+
+import java.io.BufferedReader;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.spargel.MessageIterator;
+import org.apache.flink.graph.spargel.MessagingFunction;
+import org.apache.flink.graph.spargel.VertexUpdateFunction;
+import org.apache.flink.test.testdata.ConnectedComponentsData;
+import org.apache.flink.test.util.JavaProgramTestBase;
+import org.apache.flink.types.NullValue;
+
+@SuppressWarnings("serial")
+public class VertexCentricConnectedComponentsITCase extends 
JavaProgramTestBase {
+
+       private static final long SEED = 9487520347802987L;
+       
+       private static final int NUM_VERTICES = 1000;
+       
+       private static final int NUM_EDGES = 10000;
+
+       private String resultPath;
+       
+       
+       @Override
+       protected void preSubmit() throws Exception {
+               resultPath = getTempFilePath("results");
+       }
+       
+       @Override
+       protected void testProgram() throws Exception {
+               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               
+               DataSet<Long> vertexIds = env.generateSequence(1, NUM_VERTICES);
+               DataSet<String> edgeString = 
env.fromElements(ConnectedComponentsData.getRandomOddEvenEdges(NUM_EDGES, 
NUM_VERTICES, SEED).split("\n"));
+               
+               DataSet<Edge<Long, NullValue>> edges = edgeString.map(new 
EdgeParser());
+               
+               DataSet<Vertex<Long, Long>> initialVertices = vertexIds.map(new 
IdAssigner());
+               Graph<Long, Long, NullValue> graph = 
Graph.fromDataSet(initialVertices, edges, env); 
+               
+               Graph<Long, Long, NullValue> result = 
graph.runVertexCentricIteration(new CCUpdater(), new CCMessager(), 100);
+               
+               result.getVertices().writeAsCsv(resultPath, "\n", " ");
+               env.execute();
+       }
+       
+       public static final class CCUpdater extends VertexUpdateFunction<Long, 
Long, Long> {
+               @Override
+               public void updateVertex(Long vertexKey, Long vertexValue, 
MessageIterator<Long> inMessages) {
+                       long min = Long.MAX_VALUE;
+                       for (long msg : inMessages) {
+                               min = Math.min(min, msg);
+                       }
+                       if (min < vertexValue) {
+                               setNewVertexValue(min);
+                       }
+               }
+       }
+       
+       public static final class CCMessager extends MessagingFunction<Long, 
Long, Long, NullValue> {
+               @Override
+               public void sendMessages(Long vertexId, Long componentId) {
+                       sendMessageToAllNeighbors(componentId);
+               }
+       }
+       
+       /**
+        * A map function that takes a Long value and creates a 2-tuple out of 
it:
+        * <pre>(Long value) -> (value, value)</pre>
+        */
+       public static final class IdAssigner implements MapFunction<Long, 
Vertex<Long, Long>> {
+               @Override
+               public Vertex<Long, Long> map(Long value) {
+                       return new Vertex<Long, Long>(value, value);
+               }
+       }
+
+       @Override
+       protected void postSubmit() throws Exception {
+               for (BufferedReader reader : getResultReader(resultPath)) {
+                       ConnectedComponentsData.checkOddEvenResult(reader);
+               }
+       }
+       
+       public static final class EdgeParser extends RichMapFunction<String, 
Edge<Long, NullValue>> {
+               public Edge<Long, NullValue> map(String value) {
+                       String[] nums = value.split(" ");
+                       return new Edge<Long, 
NullValue>(Long.parseLong(nums[0]), Long.parseLong(nums[1]), 
+                                       NullValue.getInstance());
+               }
+       }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/6296c394/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/WeaklyConnectedITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/WeaklyConnectedITCase.java
 
b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/WeaklyConnectedITCase.java
new file mode 100644
index 0000000..dd08f47
--- /dev/null
+++ 
b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/WeaklyConnectedITCase.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.test;
+
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class WeaklyConnectedITCase extends MultipleProgramsTestBase {
+
+       public WeaklyConnectedITCase(MultipleProgramsTestBase.ExecutionMode 
mode){
+               super(mode);
+       }
+
+    private String resultPath;
+    private String expectedResult;
+
+    @Rule
+       public TemporaryFolder tempFolder = new TemporaryFolder();
+
+       @Before
+       public void before() throws Exception{
+               resultPath = tempFolder.newFile().toURI().toString();
+       }
+
+       @After
+       public void after() throws Exception{
+               compareResultsByLinesInMemory(expectedResult, resultPath);
+       }
+
+       @Test
+       public void testWithConnectedDirected() throws Exception {
+               /*
+                * Test isWeaklyConnected() with a connected, directed graph
+                */
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               
+               Graph<Long, Long, Long> graph = 
Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+                               TestGraphUtils.getLongLongEdgeData(env), env);
+               
+               graph.isWeaklyConnected(10).writeAsText(resultPath);
+               
+               env.execute();
+               expectedResult = "true\n";
+       }
+
+       @Test
+       public void testWithDisconnectedDirected() throws Exception {
+               /*
+                * Test isWeaklyConnected() with a disconnected, directed graph
+                */
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               
+               Graph<Long, Long, Long> graph = 
Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+                               
TestGraphUtils.getDisconnectedLongLongEdgeData(env), env);
+               
+               graph.isWeaklyConnected(10).writeAsText(resultPath);
+               
+               env.execute();
+               expectedResult = "false\n";
+       }
+
+       @Test
+       public void testWithConnectedUndirected() throws Exception {
+               /*
+                * Test isWeaklyConnected() with a connected, undirected graph
+                */
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               
+               Graph<Long, Long, Long> graph = 
Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+                               TestGraphUtils.getLongLongEdgeData(env), 
env).getUndirected();
+               
+               graph.isWeaklyConnected(10).writeAsText(resultPath);
+               
+               env.execute();
+               expectedResult = "true\n";
+       }
+
+       @Test
+       public void testWithDisconnectedUndirected() throws Exception {
+               /*
+                * Test isWeaklyConnected() with a disconnected, undirected 
graph
+                */
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               
+               Graph<Long, Long, Long> graph = 
Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+                               
TestGraphUtils.getDisconnectedLongLongEdgeData(env), env).getUndirected();
+               
+               graph.isWeaklyConnected(10).writeAsText(resultPath);
+               
+               env.execute();
+               expectedResult = "false\n";
+       }
+}

Reply via email to