http://git-wip-us.apache.org/repos/asf/flink/blob/e6b9cecd/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestJoinWithEdges.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestJoinWithEdges.java
 
b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestJoinWithEdges.java
index 87f137f..0be97be 100644
--- 
a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestJoinWithEdges.java
+++ 
b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestJoinWithEdges.java
@@ -22,582 +22,498 @@ import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.configuration.Configuration;
 import org.apache.flink.graph.Edge;
 import org.apache.flink.graph.Graph;
-import org.apache.flink.test.util.JavaProgramTestBase;
+import org.apache.flink.graph.test.TestGraphUtils.DummyCustomParameterizedType;
+import org.apache.flink.graph.utils.EdgeToTuple3Map;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
-import java.io.IOException;
-import java.util.Collection;
-import java.util.LinkedList;
-
 @RunWith(Parameterized.class)
-public class TestJoinWithEdges extends JavaProgramTestBase {
+public class TestJoinWithEdges extends MultipleProgramsTestBase {
 
-    private static int NUM_PROGRAMS = 15;
+       public TestJoinWithEdges(MultipleProgramsTestBase.ExecutionMode mode){
+               super(mode);
+       }
 
-    private int curProgId = config.getInteger("ProgramId", -1);
     private String resultPath;
     private String expectedResult;
 
-    public TestJoinWithEdges(Configuration config) {
-        super(config);
+    @Rule
+       public TemporaryFolder tempFolder = new TemporaryFolder();
+
+       @Before
+       public void before() throws Exception{
+               resultPath = tempFolder.newFile().toURI().toString();
+       }
+
+       @After
+       public void after() throws Exception{
+               compareResultsByLinesInMemory(expectedResult, resultPath);
+       }
+
+       @Test
+       public void testWithEdgesInputDataset() throws Exception {
+               /*
+                * Test joinWithEdges with the input DataSet parameter identical
+                * to the edge DataSet
+                */
+        final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+        Graph<Long, Long, Long> graph = 
Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+                TestGraphUtils.getLongLongEdgeData(env), env);
+
+        Graph<Long, Long, Long> result = graph.joinWithEdges(graph.getEdges()
+                        .map(new EdgeToTuple3Map<Long, Long>()), new 
AddValuesMapper());
+
+        result.getEdges().writeAsCsv(resultPath);
+        env.execute();
+
+        expectedResult = "1,2,24\n" +
+                       "1,3,26\n" +
+                       "2,3,46\n" +
+                       "3,4,68\n" +
+                       "3,5,70\n" +
+                       "4,5,90\n" +
+                       "5,1,102\n";
+    }
+
+       @Test
+       public void testWithLessElements() throws Exception {
+           /*
+                * Test joinWithEdges with the input DataSet passed as a 
parameter containing
+                * less elements than the edge DataSet, but of the same type
+                */
+        final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+        Graph<Long, Long, Long> graph = 
Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+                TestGraphUtils.getLongLongEdgeData(env), env);
+
+        Graph<Long, Long, Long> result = 
graph.joinWithEdges(graph.getEdges().first(3)
+                        .map(new EdgeToTuple3Map<Long, Long>()), new 
AddValuesMapper());
+
+        result.getEdges().writeAsCsv(resultPath);
+        env.execute();
+
+        expectedResult = "1,2,24\n" +
+                       "1,3,26\n" +
+                       "2,3,46\n" +
+                       "3,4,34\n" +
+                       "3,5,35\n" +
+                       "4,5,45\n" +
+                       "5,1,51\n";
     }
 
-    @Override
-    protected void preSubmit() throws Exception {
-        resultPath = getTempDirPath("result");
+       @Test
+       public void testWithLessElementsDifferentType() throws Exception {
+           /*
+                * Test joinWithEdges with the input DataSet passed as a 
parameter containing
+                * less elements than the edge DataSet and of a different 
type(Boolean)
+                */
+        final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+        Graph<Long, Long, Long> graph = 
Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+                TestGraphUtils.getLongLongEdgeData(env), env);
+
+        Graph<Long, Long, Long> result = 
graph.joinWithEdges(graph.getEdges().first(3)
+                        .map(new BooleanEdgeValueMapper()), new 
DoubleIfTrueMapper());
+
+        result.getEdges().writeAsCsv(resultPath);
+        env.execute();
+
+        expectedResult = "1,2,24\n" +
+                       "1,3,26\n" +
+                       "2,3,46\n" +
+                       "3,4,34\n" +
+                       "3,5,35\n" +
+                       "4,5,45\n" +
+                       "5,1,51\n";
     }
 
-    @Override
-    protected void testProgram() throws Exception {
-        expectedResult = GraphProgs.runProgram(curProgId, resultPath);
+       @Test
+       public void testWithNoCommonKeys() throws Exception {
+           /*
+                * Test joinWithEdges with the input DataSet containing 
different keys than the edge DataSet
+                * - the iterator becomes empty.
+                */
+        final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+        Graph<Long, Long, Long> graph = 
Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+                TestGraphUtils.getLongLongEdgeData(env), env);
+
+        Graph<Long, Long, Long> result = 
graph.joinWithEdges(TestGraphUtils.getLongLongLongTuple3Data(env),
+                new DoubleValueMapper());
+
+        result.getEdges().writeAsCsv(resultPath);
+        env.execute();
+
+        expectedResult = "1,2,24\n" +
+                       "1,3,26\n" +
+                       "2,3,46\n" +
+                       "3,4,68\n" +
+                       "3,5,35\n" +
+                       "4,5,45\n" +
+                       "5,1,51\n";
     }
 
-    @Override
-    protected void postSubmit() throws Exception {
-        compareResultsByLinesInMemory(expectedResult, resultPath);
+       @Test
+       public void testWithCustomType() throws Exception {
+           /*
+            * Test joinWithEdges with a DataSet containing custom parametrised 
type input values
+                */
+        final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+        Graph<Long, Long, Long> graph = 
Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+                TestGraphUtils.getLongLongEdgeData(env), env);
+
+        Graph<Long, Long, Long> result = 
graph.joinWithEdges(TestGraphUtils.getLongLongCustomTuple3Data(env),
+                new CustomValueMapper());
+
+        result.getEdges().writeAsCsv(resultPath);
+        env.execute();
+
+        expectedResult = "1,2,10\n" +
+                       "1,3,20\n" +
+                       "2,3,30\n" +
+                       "3,4,40\n" +
+                       "3,5,35\n" +
+                       "4,5,45\n" +
+                       "5,1,51\n";
     }
 
-    @Parameterized.Parameters
-    public static Collection<Object[]> getConfigurations() throws IOException {
+       @Test
+       public void testWithEdgesOnSource() throws Exception {
+           /*
+                * Test joinWithEdgesOnSource with the input DataSet parameter 
identical
+                * to the edge DataSet
+                */
+        final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+        Graph<Long, Long, Long> graph = 
Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+                TestGraphUtils.getLongLongEdgeData(env), env);
+
+        Graph<Long, Long, Long> result = 
graph.joinWithEdgesOnSource(graph.getEdges()
+                        .map(new ProjectSourceAndValueMapper()), new 
AddValuesMapper());
+
+        result.getEdges().writeAsCsv(resultPath);
+        env.execute();
+
+        expectedResult = "1,2,24\n" +
+                       "1,3,25\n" +
+                       "2,3,46\n" +
+                       "3,4,68\n" +
+                       "3,5,69\n" +
+                       "4,5,90\n" +
+                       "5,1,102\n";
+    }
 
-        LinkedList<Configuration> tConfigs = new LinkedList<Configuration>();
+       @Test
+       public void testOnSourceWithLessElements() throws Exception {
+           /*
+                * Test joinWithEdgesOnSource with the input DataSet passed as 
a parameter containing
+                * less elements than the edge DataSet, but of the same type
+                */
+        final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+        Graph<Long, Long, Long> graph = 
Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+                TestGraphUtils.getLongLongEdgeData(env), env);
+
+        Graph<Long, Long, Long> result = 
graph.joinWithEdgesOnSource(graph.getEdges().first(3)
+                        .map(new ProjectSourceAndValueMapper()), new 
AddValuesMapper());
+
+        result.getEdges().writeAsCsv(resultPath);
+        env.execute();
+
+        expectedResult = "1,2,24\n" +
+                       "1,3,25\n" +
+                       "2,3,46\n" +
+                       "3,4,34\n" +
+                       "3,5,35\n" +
+                       "4,5,45\n" +
+                       "5,1,51\n";
+    }
 
-        for(int i=1; i <= NUM_PROGRAMS; i++) {
-            Configuration config = new Configuration();
-            config.setInteger("ProgramId", i);
-            tConfigs.add(config);
-        }
+       @Test
+       public void testOnSourceWithDifferentType() throws Exception {
+           /*
+                * Test joinWithEdgesOnSource with the input DataSet passed as 
a parameter containing
+                * less elements than the edge DataSet and of a different 
type(Boolean)
+                */
+        final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+        Graph<Long, Long, Long> graph = 
Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+                TestGraphUtils.getLongLongEdgeData(env), env);
+
+        Graph<Long, Long, Long> result = 
graph.joinWithEdgesOnSource(graph.getEdges().first(3)
+                        .map(new ProjectSourceWithTrueMapper()), new 
DoubleIfTrueMapper());
+
+        result.getEdges().writeAsCsv(resultPath);
+        env.execute();
+
+        expectedResult = "1,2,24\n" +
+                       "1,3,26\n" +
+                       "2,3,46\n" +
+                       "3,4,34\n" +
+                       "3,5,35\n" +
+                       "4,5,45\n" +
+                       "5,1,51\n";
+    }
+
+       @Test
+       public void testOnSourceWithNoCommonKeys() throws Exception {
+           /*
+                * Test joinWithEdgesOnSource with the input DataSet containing 
different keys than the edge DataSet
+                * - the iterator becomes empty.
+                */
+        final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+        Graph<Long, Long, Long> graph = 
Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+                TestGraphUtils.getLongLongEdgeData(env), env);
+
+        Graph<Long, Long, Long> result = 
graph.joinWithEdgesOnSource(TestGraphUtils.getLongLongTuple2SourceData(env),
+                new DoubleValueMapper());
+
+        result.getEdges().writeAsCsv(resultPath);
+        env.execute();
+
+        expectedResult = "1,2,20\n" +
+                       "1,3,20\n" +
+                       "2,3,60\n" +
+                       "3,4,80\n" +
+                       "3,5,80\n" +
+                       "4,5,120\n" +
+                       "5,1,51\n";
+    }
+
+       @Test
+       public void testOnSourceWithCustom() throws Exception {
+           /*
+            * Test joinWithEdgesOnSource with a DataSet containing custom 
parametrised type input values
+                */
+        final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+        Graph<Long, Long, Long> graph = 
Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+                TestGraphUtils.getLongLongEdgeData(env), env);
+
+        Graph<Long, Long, Long> result = 
graph.joinWithEdgesOnSource(TestGraphUtils.getLongCustomTuple2SourceData(env),
+                new CustomValueMapper());
+
+        result.getEdges().writeAsCsv(resultPath);
+        env.execute();
+
+        expectedResult = "1,2,10\n" +
+                       "1,3,10\n" +
+                       "2,3,30\n" +
+                       "3,4,40\n" +
+                       "3,5,40\n" +
+                       "4,5,45\n" +
+                       "5,1,51\n";
+    }
 
-        return toParameterList(tConfigs);
+       @Test
+       public void testWithEdgesOnTarget() throws Exception {
+    /*
+        * Test joinWithEdgesOnTarget with the input DataSet parameter identical
+        * to the edge DataSet
+        */
+        final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+        Graph<Long, Long, Long> graph = 
Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+                TestGraphUtils.getLongLongEdgeData(env), env);
+
+        Graph<Long, Long, Long> result = 
graph.joinWithEdgesOnTarget(graph.getEdges()
+                        .map(new ProjectTargetAndValueMapper()), new 
AddValuesMapper());
+
+        result.getEdges().writeAsCsv(resultPath);
+        env.execute();
+
+        expectedResult = "1,2,24\n" +
+                       "1,3,26\n" +
+                       "2,3,36\n" +
+                       "3,4,68\n" +
+                       "3,5,70\n" +
+                       "4,5,80\n" +
+                       "5,1,102\n";
     }
 
-    private static class GraphProgs {
-
-        @SuppressWarnings("serial")
-        public static String runProgram(int progId, String resultPath) throws 
Exception {
-
-            switch (progId) {
-                case 1: {
-                               /*
-                                * Test joinWithEdges with the input DataSet 
parameter identical
-                                * to the edge DataSet
-                                */
-                    final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-
-                    Graph<Long, Long, Long> graph = 
Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
-                            TestGraphUtils.getLongLongEdgeData(env), env);
-
-                    Graph<Long, Long, Long> result = 
graph.joinWithEdges(graph.getEdges()
-                                    .map(new MapFunction<Edge<Long, Long>, 
Tuple3<Long, Long, Long>>() {
-                                        @Override
-                                        public Tuple3<Long, Long, Long> 
map(Edge<Long, Long> edge) throws Exception {
-                                            return new Tuple3<Long, Long, 
Long>(edge.getSource(),
-                                                    edge.getTarget(), 
edge.getValue());
-                                        }
-                                    }),
-                            new MapFunction<Tuple2<Long, Long>, Long>() {
-
-                                @Override
-                                public Long map(Tuple2<Long, Long> tuple) 
throws Exception {
-                                    return tuple.f0 + tuple.f1;
-                                }
-                            });
-
-                    result.getEdges().writeAsCsv(resultPath);
-                    env.execute();
-
-                    return "1,2,24\n" +
-                            "1,3,26\n" +
-                            "2,3,46\n" +
-                            "3,4,68\n" +
-                            "3,5,70\n" +
-                            "4,5,90\n" +
-                            "5,1,102\n";
-                }
-                case 2: {
-                /*
-                                * Test joinWithEdges with the input DataSet 
passed as a parameter containing
-                                * less elements than the edge DataSet, but of 
the same type
-                                */
-                    final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-
-                    Graph<Long, Long, Long> graph = 
Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
-                            TestGraphUtils.getLongLongEdgeData(env), env);
-
-                    Graph<Long, Long, Long> result = 
graph.joinWithEdges(graph.getEdges().first(3)
-                                    .map(new MapFunction<Edge<Long, Long>, 
Tuple3<Long, Long, Long>>() {
-                                        @Override
-                                        public Tuple3<Long, Long, Long> 
map(Edge<Long, Long> edge) throws Exception {
-                                            return new Tuple3<Long, Long, 
Long>(edge.getSource(),
-                                                    edge.getTarget(), 
edge.getValue());
-                                        }
-                                    }),
-                            new MapFunction<Tuple2<Long, Long>, Long>() {
-
-                                @Override
-                                public Long map(Tuple2<Long, Long> tuple) 
throws Exception {
-                                    return tuple.f0 + tuple.f1;
-                                }
-                            });
-
-                    result.getEdges().writeAsCsv(resultPath);
-                    env.execute();
-
-                    return "1,2,24\n" +
-                            "1,3,26\n" +
-                            "2,3,46\n" +
-                            "3,4,34\n" +
-                            "3,5,35\n" +
-                            "4,5,45\n" +
-                            "5,1,51\n";
-                }
-                case 3: {
-                /*
-                                * Test joinWithEdges with the input DataSet 
passed as a parameter containing
-                                * less elements than the edge DataSet and of a 
different type(Boolean)
-                                */
-                    final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-
-                    Graph<Long, Long, Long> graph = 
Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
-                            TestGraphUtils.getLongLongEdgeData(env), env);
-
-                    Graph<Long, Long, Long> result = 
graph.joinWithEdges(graph.getEdges().first(3)
-                                    .map(new MapFunction<Edge<Long, Long>, 
Tuple3<Long, Long, Boolean>>() {
-                                        @Override
-                                        public Tuple3<Long, Long, Boolean> 
map(Edge<Long, Long> edge) throws Exception {
-                                            return new Tuple3<Long, Long, 
Boolean>(edge.getSource(),
-                                                    edge.getTarget(), true);
-                                        }
-                                    }),
-                            new MapFunction<Tuple2<Long, Boolean>, Long>() {
-
-                                @Override
-                                public Long map(Tuple2<Long, Boolean> tuple) 
throws Exception {
-                                    if(tuple.f1) {
-                                        return tuple.f0 * 2;
-                                    }
-                                    else {
-                                        return tuple.f0;
-                                    }
-                                }
-                            });
-
-                    result.getEdges().writeAsCsv(resultPath);
-                    env.execute();
-
-                    return "1,2,24\n" +
-                            "1,3,26\n" +
-                            "2,3,46\n" +
-                            "3,4,34\n" +
-                            "3,5,35\n" +
-                            "4,5,45\n" +
-                            "5,1,51\n";
-                }
-                case 4: {
-                /*
-                                * Test joinWithEdges with the input DataSet 
containing different keys than the edge DataSet
-                                * - the iterator becomes empty.
-                                */
-                    final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-
-                    Graph<Long, Long, Long> graph = 
Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
-                            TestGraphUtils.getLongLongEdgeData(env), env);
-
-                    Graph<Long, Long, Long> result = 
graph.joinWithEdges(TestGraphUtils.getLongLongLongTuple3Data(env),
-                            new MapFunction<Tuple2<Long, Long>, Long>() {
-
-                                @Override
-                                public Long map(Tuple2<Long, Long> tuple) 
throws Exception {
-                                    return tuple.f1 * 2;
-                                }
-                            });
-
-                    result.getEdges().writeAsCsv(resultPath);
-                    env.execute();
-
-                    return "1,2,24\n" +
-                            "1,3,26\n" +
-                            "2,3,46\n" +
-                            "3,4,68\n" +
-                            "3,5,35\n" +
-                            "4,5,45\n" +
-                            "5,1,51\n";
-                }
-                case 5: {
-                /*
-                    * Test joinWithEdges with a DataSet containing custom 
parametrised type input values
-                        */
-                    final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-
-                    Graph<Long, Long, Long> graph = 
Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
-                            TestGraphUtils.getLongLongEdgeData(env), env);
-
-                    Graph<Long, Long, Long> result = 
graph.joinWithEdges(TestGraphUtils.getLongLongCustomTuple3Data(env),
-                            new MapFunction<Tuple2<Long, 
TestGraphUtils.DummyCustomParameterizedType<Float>>, Long>() {
-                                public Long map(Tuple2<Long, 
TestGraphUtils.DummyCustomParameterizedType<Float>> tuple) throws Exception {
-                                    return (long) tuple.f1.getIntField();
-                                }
-                            });
-
-                    result.getEdges().writeAsCsv(resultPath);
-                    env.execute();
-
-                    return "1,2,10\n" +
-                            "1,3,20\n" +
-                            "2,3,30\n" +
-                            "3,4,40\n" +
-                            "3,5,35\n" +
-                            "4,5,45\n" +
-                            "5,1,51\n";
-                }
-                case 6: {
-                /*
-                                * Test joinWithEdgesOnSource with the input 
DataSet parameter identical
-                                * to the edge DataSet
-                                */
-                    final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-
-                    Graph<Long, Long, Long> graph = 
Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
-                            TestGraphUtils.getLongLongEdgeData(env), env);
-
-                    Graph<Long, Long, Long> result = 
graph.joinWithEdgesOnSource(graph.getEdges()
-                                    .map(new MapFunction<Edge<Long, Long>, 
Tuple2<Long, Long>>() {
-                                        @Override
-                                        public Tuple2<Long, Long> 
map(Edge<Long, Long> edge) throws Exception {
-                                            return new Tuple2<Long, 
Long>(edge.getSource(), edge.getValue());
-                                        }
-                                    }),
-                            new MapFunction<Tuple2<Long, Long>, Long>() {
-
-                                @Override
-                                public Long map(Tuple2<Long, Long> tuple) 
throws Exception {
-                                    return tuple.f0 + tuple.f1;
-                                }
-                            });
-
-                    result.getEdges().writeAsCsv(resultPath);
-                    env.execute();
-
-                    return "1,2,24\n" +
-                            "1,3,25\n" +
-                            "2,3,46\n" +
-                            "3,4,68\n" +
-                            "3,5,69\n" +
-                            "4,5,90\n" +
-                            "5,1,102\n";
-                }
-                case 7: {
-                /*
-                                * Test joinWithEdgesOnSource with the input 
DataSet passed as a parameter containing
-                                * less elements than the edge DataSet, but of 
the same type
-                                */
-                    final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-
-                    Graph<Long, Long, Long> graph = 
Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
-                            TestGraphUtils.getLongLongEdgeData(env), env);
-
-                    Graph<Long, Long, Long> result = 
graph.joinWithEdgesOnSource(graph.getEdges().first(3)
-                                    .map(new MapFunction<Edge<Long, Long>, 
Tuple2<Long, Long>>() {
-                                        @Override
-                                        public Tuple2<Long, Long> 
map(Edge<Long, Long> edge) throws Exception {
-                                            return new Tuple2<Long, 
Long>(edge.getSource(), edge.getValue());
-                                        }
-                                    }),
-                            new MapFunction<Tuple2<Long, Long>, Long>() {
-
-                                @Override
-                                public Long map(Tuple2<Long, Long> tuple) 
throws Exception {
-                                    return tuple.f0 + tuple.f1;
-                                }
-                            });
-
-                    result.getEdges().writeAsCsv(resultPath);
-                    env.execute();
-
-                    return "1,2,24\n" +
-                            "1,3,25\n" +
-                            "2,3,46\n" +
-                            "3,4,34\n" +
-                            "3,5,35\n" +
-                            "4,5,45\n" +
-                            "5,1,51\n";
-                }
-                case 8: {
-                /*
-                                * Test joinWithEdgesOnSource with the input 
DataSet passed as a parameter containing
-                                * less elements than the edge DataSet and of a 
different type(Boolean)
-                                */
-                    final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-
-                    Graph<Long, Long, Long> graph = 
Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
-                            TestGraphUtils.getLongLongEdgeData(env), env);
-
-                    Graph<Long, Long, Long> result = 
graph.joinWithEdgesOnSource(graph.getEdges().first(3)
-                                    .map(new MapFunction<Edge<Long, Long>, 
Tuple2<Long, Boolean>>() {
-                                        @Override
-                                        public Tuple2<Long, Boolean> 
map(Edge<Long, Long> edge) throws Exception {
-                                            return new Tuple2<Long, 
Boolean>(edge.getSource(), true);
-                                        }
-                                    }),
-                            new MapFunction<Tuple2<Long, Boolean>, Long>() {
-
-                                @Override
-                                public Long map(Tuple2<Long, Boolean> tuple) 
throws Exception {
-                                    if (tuple.f1) {
-                                        return tuple.f0 * 2;
-                                    } else {
-                                        return tuple.f0;
-                                    }
-                                }
-                            });
-
-                    result.getEdges().writeAsCsv(resultPath);
-                    env.execute();
-
-                    return "1,2,24\n" +
-                            "1,3,26\n" +
-                            "2,3,46\n" +
-                            "3,4,34\n" +
-                            "3,5,35\n" +
-                            "4,5,45\n" +
-                            "5,1,51\n";
-                }
-                case 9: {
-                /*
-                                * Test joinWithEdgesOnSource with the input 
DataSet containing different keys than the edge DataSet
-                                * - the iterator becomes empty.
-                                */
-                    final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-
-                    Graph<Long, Long, Long> graph = 
Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
-                            TestGraphUtils.getLongLongEdgeData(env), env);
-
-                    Graph<Long, Long, Long> result = 
graph.joinWithEdgesOnSource(TestGraphUtils.getLongLongTuple2SourceData(env),
-                            new MapFunction<Tuple2<Long, Long>, Long>() {
-
-                                @Override
-                                public Long map(Tuple2<Long, Long> tuple) 
throws Exception {
-                                    return tuple.f1 * 2;
-                                }
-                            });
-
-                    result.getEdges().writeAsCsv(resultPath);
-                    env.execute();
-
-                    return "1,2,20\n" +
-                            "1,3,20\n" +
-                            "2,3,60\n" +
-                            "3,4,80\n" +
-                            "3,5,80\n" +
-                            "4,5,120\n" +
-                            "5,1,51\n";
-                }
-                case 10: {
-                /*
-                    * Test joinWithEdgesOnSource with a DataSet containing 
custom parametrised type input values
-                        */
-                    final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-
-                    Graph<Long, Long, Long> graph = 
Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
-                            TestGraphUtils.getLongLongEdgeData(env), env);
-
-                    Graph<Long, Long, Long> result = 
graph.joinWithEdgesOnSource(TestGraphUtils.getLongCustomTuple2SourceData(env),
-                            new MapFunction<Tuple2<Long, 
TestGraphUtils.DummyCustomParameterizedType<Float>>, Long>() {
-                                public Long map(Tuple2<Long, 
TestGraphUtils.DummyCustomParameterizedType<Float>> tuple) throws Exception {
-                                    return (long) tuple.f1.getIntField();
-                                }
-                            });
-
-                    result.getEdges().writeAsCsv(resultPath);
-                    env.execute();
-
-                    return "1,2,10\n" +
-                            "1,3,10\n" +
-                            "2,3,30\n" +
-                            "3,4,40\n" +
-                            "3,5,40\n" +
-                            "4,5,45\n" +
-                            "5,1,51\n";
-                }
-                case 11: {
-                /*
-                                * Test joinWithEdgesOnTarget with the input 
DataSet parameter identical
-                                * to the edge DataSet
-                                */
-                    final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-
-                    Graph<Long, Long, Long> graph = 
Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
-                            TestGraphUtils.getLongLongEdgeData(env), env);
-
-                    Graph<Long, Long, Long> result = 
graph.joinWithEdgesOnTarget(graph.getEdges()
-                                    .map(new MapFunction<Edge<Long, Long>, 
Tuple2<Long, Long>>() {
-                                        @Override
-                                        public Tuple2<Long, Long> 
map(Edge<Long, Long> edge) throws Exception {
-                                            return new Tuple2<Long, 
Long>(edge.getTarget(), edge.getValue());
-                                        }
-                                    }),
-                            new MapFunction<Tuple2<Long, Long>, Long>() {
-
-                                @Override
-                                public Long map(Tuple2<Long, Long> tuple) 
throws Exception {
-                                    return tuple.f0 + tuple.f1;
-                                }
-                            });
-
-                    result.getEdges().writeAsCsv(resultPath);
-                    env.execute();
-
-                    return "1,2,24\n" +
-                            "1,3,26\n" +
-                            "2,3,36\n" +
-                            "3,4,68\n" +
-                            "3,5,70\n" +
-                            "4,5,80\n" +
-                            "5,1,102\n";
-                }
-                case 12: {
-                /*
-                                * Test joinWithEdgesOnTarget with the input 
DataSet passed as a parameter containing
-                                * less elements than the edge DataSet, but of 
the same type
-                                */
-                    final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-
-                    Graph<Long, Long, Long> graph = 
Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
-                            TestGraphUtils.getLongLongEdgeData(env), env);
-
-                    Graph<Long, Long, Long> result = 
graph.joinWithEdgesOnTarget(graph.getEdges().first(3)
-                                    .map(new MapFunction<Edge<Long, Long>, 
Tuple2<Long, Long>>() {
-                                        @Override
-                                        public Tuple2<Long, Long> 
map(Edge<Long, Long> edge) throws Exception {
-                                            return new Tuple2<Long, 
Long>(edge.getTarget(), edge.getValue());
-                                        }
-                                    }),
-                            new MapFunction<Tuple2<Long, Long>, Long>() {
-
-                                @Override
-                                public Long map(Tuple2<Long, Long> tuple) 
throws Exception {
-                                    return tuple.f0 + tuple.f1;
-                                }
-                            });
-
-                    result.getEdges().writeAsCsv(resultPath);
-                    env.execute();
-
-                    result.getEdges().writeAsCsv(resultPath);
-                    env.execute();
-
-                    return "1,2,24\n" +
-                            "1,3,26\n" +
-                            "2,3,36\n" +
-                            "3,4,34\n" +
-                            "3,5,35\n" +
-                            "4,5,45\n" +
-                            "5,1,51\n";
-                }
-                case 13: {
-                /*
-                                * Test joinWithEdgesOnTarget with the input 
DataSet passed as a parameter containing
-                                * less elements than the edge DataSet and of a 
different type(Boolean)
-                                */
-                    final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-
-                    Graph<Long, Long, Long> graph = 
Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
-                            TestGraphUtils.getLongLongEdgeData(env), env);
-
-                    Graph<Long, Long, Long> result = 
graph.joinWithEdgesOnTarget(graph.getEdges().first(3)
-                                    .map(new MapFunction<Edge<Long, Long>, 
Tuple2<Long, Boolean>>() {
-                                        @Override
-                                        public Tuple2<Long, Boolean> 
map(Edge<Long, Long> edge) throws Exception {
-                                            return new Tuple2<Long, 
Boolean>(edge.getTarget(), true);
-                                        }
-                                    }),
-                            new MapFunction<Tuple2<Long, Boolean>, Long>() {
-
-                                @Override
-                                public Long map(Tuple2<Long, Boolean> tuple) 
throws Exception {
-                                    if (tuple.f1) {
-                                        return tuple.f0 * 2;
-                                    } else {
-                                        return tuple.f0;
-                                    }
-                                }
-                            });
-
-                    result.getEdges().writeAsCsv(resultPath);
-                    env.execute();
-
-                    return "1,2,24\n" +
-                            "1,3,26\n" +
-                            "2,3,46\n" +
-                            "3,4,34\n" +
-                            "3,5,35\n" +
-                            "4,5,45\n" +
-                            "5,1,51\n";
-                }
-                case 14: {
-                /*
-                                * Test joinWithEdgesOnTarget with the input 
DataSet containing different keys than the edge DataSet
-                                * - the iterator becomes empty.
-                                */
-                    final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-
-                    Graph<Long, Long, Long> graph = 
Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
-                            TestGraphUtils.getLongLongEdgeData(env), env);
-
-                    Graph<Long, Long, Long> result = 
graph.joinWithEdgesOnTarget(TestGraphUtils.getLongLongTuple2TargetData(env),
-                            new MapFunction<Tuple2<Long, Long>, Long>() {
-
-                                @Override
-                                public Long map(Tuple2<Long, Long> tuple) 
throws Exception {
-                                    return tuple.f1 * 2;
-                                }
-                            });
-
-                    result.getEdges().writeAsCsv(resultPath);
-                    env.execute();
-
-                    return "1,2,20\n" +
-                            "1,3,40\n" +
-                            "2,3,40\n" +
-                            "3,4,80\n" +
-                            "3,5,35\n" +
-                            "4,5,45\n" +
-                            "5,1,140\n";
-                }
-                case 15: {
-                /*
-                    * Test joinWithEdgesOnTarget with a DataSet containing 
custom parametrised type input values
-                        */
-                    final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-
-                    Graph<Long, Long, Long> graph = 
Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
-                            TestGraphUtils.getLongLongEdgeData(env), env);
-
-                    Graph<Long, Long, Long> result = 
graph.joinWithEdgesOnTarget(TestGraphUtils.getLongCustomTuple2TargetData(env),
-                            new MapFunction<Tuple2<Long, 
TestGraphUtils.DummyCustomParameterizedType<Float>>, Long>() {
-                                public Long map(Tuple2<Long, 
TestGraphUtils.DummyCustomParameterizedType<Float>> tuple) throws Exception {
-                                    return (long) tuple.f1.getIntField();
-                                }
-                            });
-
-                    result.getEdges().writeAsCsv(resultPath);
-                    env.execute();
-
-                    return "1,2,10\n" +
-                            "1,3,20\n" +
-                            "2,3,20\n" +
-                            "3,4,40\n" +
-                            "3,5,35\n" +
-                            "4,5,45\n" +
-                            "5,1,51\n";
-                }
-                default:
-                    throw new IllegalArgumentException("Invalid program id");
+       @Test
+       public void testWithOnTargetWithLessElements() throws Exception {
+           /*
+                * Test joinWithEdgesOnTarget with the input DataSet passed as 
a parameter containing
+                * less elements than the edge DataSet, but of the same type
+                */
+        final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+        Graph<Long, Long, Long> graph = 
Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+                TestGraphUtils.getLongLongEdgeData(env), env);
+
+        Graph<Long, Long, Long> result = 
graph.joinWithEdgesOnTarget(graph.getEdges().first(3)
+                        .map(new ProjectTargetAndValueMapper()), new 
AddValuesMapper());
+
+        result.getEdges().writeAsCsv(resultPath);
+        env.execute();
+
+        result.getEdges().writeAsCsv(resultPath);
+        env.execute();
+
+        expectedResult = "1,2,24\n" +
+                       "1,3,26\n" +
+                       "2,3,36\n" +
+                       "3,4,34\n" +
+                       "3,5,35\n" +
+                       "4,5,45\n" +
+                       "5,1,51\n";
+    }
+
+       @Test
+       public void testOnTargetWithDifferentType() throws Exception {
+           /*
+                * Test joinWithEdgesOnTarget with the input DataSet passed as 
a parameter containing
+                * less elements than the edge DataSet and of a different 
type(Boolean)
+                */
+        final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+        Graph<Long, Long, Long> graph = 
Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+                TestGraphUtils.getLongLongEdgeData(env), env);
+
+        Graph<Long, Long, Long> result = 
graph.joinWithEdgesOnTarget(graph.getEdges().first(3)
+                        .map(new ProjectTargetWithTrueMapper()), new 
DoubleIfTrueMapper());
+
+        result.getEdges().writeAsCsv(resultPath);
+        env.execute();
+
+        expectedResult = "1,2,24\n" +
+                       "1,3,26\n" +
+                       "2,3,46\n" +
+                       "3,4,34\n" +
+                       "3,5,35\n" +
+                       "4,5,45\n" +
+                       "5,1,51\n";
+    }
+
+       @Test
+       public void testOnTargetWithNoCommonKeys() throws Exception {
+           /*
+                * Test joinWithEdgesOnTarget with the input DataSet containing 
different keys than the edge DataSet
+                * - the iterator becomes empty.
+                */
+        final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+        Graph<Long, Long, Long> graph = 
Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+                TestGraphUtils.getLongLongEdgeData(env), env);
+
+        Graph<Long, Long, Long> result = 
graph.joinWithEdgesOnTarget(TestGraphUtils.getLongLongTuple2TargetData(env),
+                new DoubleValueMapper());
+
+        result.getEdges().writeAsCsv(resultPath);
+        env.execute();
+
+        expectedResult = "1,2,20\n" +
+                       "1,3,40\n" +
+                       "2,3,40\n" +
+                       "3,4,80\n" +
+                       "3,5,35\n" +
+                       "4,5,45\n" +
+                       "5,1,140\n";
+    }
+
+       @Test
+       public void testOnTargetWithCustom() throws Exception {
+           /*
+            * Test joinWithEdgesOnTarget with a DataSet containing custom 
parametrised type input values
+                */
+        final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+        Graph<Long, Long, Long> graph = 
Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+                TestGraphUtils.getLongLongEdgeData(env), env);
+
+        Graph<Long, Long, Long> result = 
graph.joinWithEdgesOnTarget(TestGraphUtils.getLongCustomTuple2TargetData(env),
+                new CustomValueMapper());
+
+        result.getEdges().writeAsCsv(resultPath);
+        env.execute();
+
+        expectedResult = "1,2,10\n" +
+                       "1,3,20\n" +
+                       "2,3,20\n" +
+                       "3,4,40\n" +
+                       "3,5,35\n" +
+                       "4,5,45\n" +
+                       "5,1,51\n";
+    }
+
+       @SuppressWarnings("serial")
+       private static final class AddValuesMapper implements 
MapFunction<Tuple2<Long, Long>, Long> {
+               public Long map(Tuple2<Long, Long> tuple) throws Exception {
+                       return tuple.f0 + tuple.f1;
+               }
+       }
+
+       @SuppressWarnings("serial")
+       private static final class BooleanEdgeValueMapper implements 
MapFunction<Edge<Long, Long>, Tuple3<Long, Long, Boolean>> {
+        public Tuple3<Long, Long, Boolean> map(Edge<Long, Long> edge) throws 
Exception {
+            return new Tuple3<Long, Long, Boolean>(edge.getSource(),
+                    edge.getTarget(), true);
+        }
+    }
+
+       @SuppressWarnings("serial")
+       private static final class DoubleIfTrueMapper implements 
MapFunction<Tuple2<Long, Boolean>, Long> {
+        public Long map(Tuple2<Long, Boolean> tuple) throws Exception {
+            if(tuple.f1) {
+                return tuple.f0 * 2;
+            }
+            else {
+                return tuple.f0;
             }
         }
     }
+
+       @SuppressWarnings("serial")
+       private static final class DoubleValueMapper implements 
MapFunction<Tuple2<Long, Long>, Long> {
+        public Long map(Tuple2<Long, Long> tuple) throws Exception {
+            return tuple.f1 * 2;
+        }
+    }
+
+       @SuppressWarnings("serial")
+       private static final class CustomValueMapper implements 
MapFunction<Tuple2<Long, DummyCustomParameterizedType<Float>>, Long> {
+        public Long map(Tuple2<Long, DummyCustomParameterizedType<Float>> 
tuple) throws Exception {
+            return (long) tuple.f1.getIntField();
+        }
+    }
+
+       @SuppressWarnings("serial")
+       private static final class ProjectSourceAndValueMapper implements 
MapFunction<Edge<Long, Long>, Tuple2<Long, Long>> {
+        public Tuple2<Long, Long> map(Edge<Long, Long> edge) throws Exception {
+            return new Tuple2<Long, Long>(edge.getSource(), edge.getValue());
+        }
+    }
+
+       @SuppressWarnings("serial")
+       private static final class ProjectSourceWithTrueMapper implements 
MapFunction<Edge<Long, Long>, Tuple2<Long, Boolean>> {
+        public Tuple2<Long, Boolean> map(Edge<Long, Long> edge) throws 
Exception {
+            return new Tuple2<Long, Boolean>(edge.getSource(), true);
+        }
+    }
+
+       @SuppressWarnings("serial")
+       private static final class ProjectTargetAndValueMapper implements 
MapFunction<Edge<Long, Long>, Tuple2<Long, Long>> {
+        public Tuple2<Long, Long> map(Edge<Long, Long> edge) throws Exception {
+            return new Tuple2<Long, Long>(edge.getTarget(), edge.getValue());
+        }
+    }
+
+       @SuppressWarnings("serial")
+       private static final class ProjectTargetWithTrueMapper implements 
MapFunction<Edge<Long, Long>, Tuple2<Long, Boolean>> {
+        public Tuple2<Long, Boolean> map(Edge<Long, Long> edge) throws 
Exception {
+            return new Tuple2<Long, Boolean>(edge.getTarget(), true);
+        }
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/e6b9cecd/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestJoinWithVertices.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestJoinWithVertices.java
 
b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestJoinWithVertices.java
index f10140b..8b0db35 100644
--- 
a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestJoinWithVertices.java
+++ 
b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestJoinWithVertices.java
@@ -21,227 +21,198 @@ package org.apache.flink.graph.test;
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.configuration.Configuration;
 import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.Vertex;
 import org.apache.flink.graph.test.TestGraphUtils.DummyCustomParameterizedType;
-import org.apache.flink.test.util.JavaProgramTestBase;
+import org.apache.flink.graph.utils.VertexToTuple2Map;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
-import java.io.IOException;
-import java.util.Collection;
-import java.util.LinkedList;
-
 @RunWith(Parameterized.class)
-public class TestJoinWithVertices extends JavaProgramTestBase {
+public class TestJoinWithVertices extends MultipleProgramsTestBase {
 
-    private static int NUM_PROGRAMS = 5;
+       public TestJoinWithVertices(MultipleProgramsTestBase.ExecutionMode 
mode){
+               super(mode);
+       }
 
-    private int curProgId = config.getInteger("ProgramId", -1);
     private String resultPath;
     private String expectedResult;
 
-    public TestJoinWithVertices(Configuration config) {
-        super(config);
+    @Rule
+       public TemporaryFolder tempFolder = new TemporaryFolder();
+
+       @Before
+       public void before() throws Exception{
+               resultPath = tempFolder.newFile().toURI().toString();
+       }
+
+       @After
+       public void after() throws Exception{
+               compareResultsByLinesInMemory(expectedResult, resultPath);
+       }
+
+       @Test
+       public void testJoinWithVertexSet() throws Exception {
+               /*
+                * Test joinWithVertices with the input DataSet parameter 
identical
+                * to the vertex DataSet
+                */
+        final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+        Graph<Long, Long, Long> graph = 
Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+                TestGraphUtils.getLongLongEdgeData(env), env);
+
+        Graph<Long, Long, Long> result = 
graph.joinWithVertices(graph.getVertices()
+                        .map(new VertexToTuple2Map<Long, Long>()), new 
AddValuesMapper());
+
+        result.getVertices().writeAsCsv(resultPath);
+        env.execute();
+
+       expectedResult = "1,2\n" +
+                       "2,4\n" +
+                       "3,6\n" +
+                       "4,8\n" +
+                       "5,10\n";
     }
 
-    @Override
-    protected void preSubmit() throws Exception {
-        resultPath = getTempDirPath("result");
+       @Test
+       public void testWithLessElements() throws Exception {
+       /*
+        * Test joinWithVertices with the input DataSet passed as a parameter 
containing
+        * less elements than the vertex DataSet, but of the same type
+        */
+        final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+        Graph<Long, Long, Long> graph = 
Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+                TestGraphUtils.getLongLongEdgeData(env), env);
+
+        Graph<Long, Long, Long> result = 
graph.joinWithVertices(graph.getVertices().first(3)
+                        .map(new VertexToTuple2Map<Long, Long>()), new 
AddValuesMapper());
+
+        result.getVertices().writeAsCsv(resultPath);
+        env.execute();
+
+        expectedResult = "1,2\n" +
+                       "2,4\n" +
+                       "3,6\n" +
+                       "4,4\n" +
+                       "5,5\n";
     }
 
-    @Override
-    protected void testProgram() throws Exception {
-        expectedResult = GraphProgs.runProgram(curProgId, resultPath);
+       @Test
+       public void testWithDifferentType() throws Exception {
+       /*
+        * Test joinWithVertices with the input DataSet passed as a parameter 
containing
+        * less elements than the vertex DataSet and of a different 
type(Boolean)
+        */
+        final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+        Graph<Long, Long, Long> graph = 
Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+                TestGraphUtils.getLongLongEdgeData(env), env);
+
+        Graph<Long, Long, Long> result = 
graph.joinWithVertices(graph.getVertices().first(3)
+                        .map(new ProjectIdWithTrue()), new 
DoubleIfTrueMapper());
+
+        result.getVertices().writeAsCsv(resultPath);
+        env.execute();
+
+        expectedResult = "1,2\n" +
+                       "2,4\n" +
+                       "3,6\n" +
+                       "4,4\n" +
+                       "5,5\n";
     }
 
-    @Override
-    protected void postSubmit() throws Exception {
-        compareResultsByLinesInMemory(expectedResult, resultPath);
+       @Test
+       public void testWithDifferentKeys() throws Exception {
+               /*
+                * Test joinWithVertices with an input DataSet containing 
different keys than the vertex DataSet
+                * - the iterator becomes empty.
+                */
+        final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+        Graph<Long, Long, Long> graph = 
Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+                TestGraphUtils.getLongLongEdgeData(env), env);
+
+        Graph<Long, Long, Long> result = 
graph.joinWithVertices(TestGraphUtils.getLongLongTuple2Data(env),
+                new ProjectSecondMapper());
+
+        result.getVertices().writeAsCsv(resultPath);
+        env.execute();
+
+        expectedResult = "1,10\n" +
+                       "2,20\n" +
+                       "3,30\n" +
+                       "4,40\n" +
+                       "5,5\n";
     }
 
-    @Parameterized.Parameters
-    public static Collection<Object[]> getConfigurations() throws IOException {
+       @Test
+       public void testWithCustomType() throws Exception {
+               /*
+                * Test joinWithVertices with a DataSet containing custom 
parametrised type input values
+                */
+        final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
 
-        LinkedList<Configuration> tConfigs = new LinkedList<Configuration>();
+        Graph<Long, Long, Long> graph = 
Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+                TestGraphUtils.getLongLongEdgeData(env), env);
 
-        for(int i=1; i <= NUM_PROGRAMS; i++) {
-            Configuration config = new Configuration();
-            config.setInteger("ProgramId", i);
-            tConfigs.add(config);
-        }
+        Graph<Long, Long, Long> result = 
graph.joinWithVertices(TestGraphUtils.getLongCustomTuple2Data(env),
+                new CustomValueMapper());
+
+        result.getVertices().writeAsCsv(resultPath);
+        env.execute();
+
+        expectedResult = "1,10\n" +
+                       "2,20\n" +
+                       "3,30\n" +
+                       "4,40\n" +
+                       "5,5\n";
+    }
 
-        return toParameterList(tConfigs);
+       @SuppressWarnings("serial")
+       private static final class AddValuesMapper implements 
MapFunction<Tuple2<Long, Long>, Long> {
+               public Long map(Tuple2<Long, Long> tuple) throws Exception {
+                       return tuple.f0 + tuple.f1;
+               }
+       }
+
+       @SuppressWarnings("serial")
+       private static final class ProjectIdWithTrue implements 
MapFunction<Vertex<Long, Long>, Tuple2<Long, Boolean>> {
+        public Tuple2<Long, Boolean> map(Vertex<Long, Long> vertex) throws 
Exception {
+            return new Tuple2<Long, Boolean>(vertex.getId(), true);
+        }
     }
 
-    private static class GraphProgs {
-
-        @SuppressWarnings("serial")
-        public static String runProgram(int progId, String resultPath) throws 
Exception {
-
-            switch (progId) {
-                case 1: {
-                               /*
-                                * Test joinWithVertices with the input DataSet 
parameter identical
-                                * to the vertex DataSet
-                                */
-                    final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-
-                    Graph<Long, Long, Long> graph = 
Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
-                            TestGraphUtils.getLongLongEdgeData(env), env);
-
-                    Graph<Long, Long, Long> result = 
graph.joinWithVertices(graph.getVertices()
-                                    .map(new MapFunction<Vertex<Long, Long>, 
Tuple2<Long, Long>>() {
-                                        @Override
-                                        public Tuple2<Long, Long> 
map(Vertex<Long, Long> vertex) throws Exception {
-                                            return new Tuple2<Long, 
Long>(vertex.getId(), vertex.getValue());
-                                        }
-                                    }),
-                            new MapFunction<Tuple2<Long, Long>, Long>() {
-
-                                @Override
-                                public Long map(Tuple2<Long, Long> tuple) 
throws Exception {
-                                    return tuple.f0 + tuple.f1;
-                                }
-                            });
-
-                    result.getVertices().writeAsCsv(resultPath);
-                    env.execute();
-
-                    return "1,2\n" +
-                            "2,4\n" +
-                            "3,6\n" +
-                            "4,8\n" +
-                            "5,10\n";
-                }
-                case 2: {
-                               /*
-                                * Test joinWithVertices with the input DataSet 
passed as a parameter containing
-                                * less elements than the vertex DataSet, but 
of the same type
-                                */
-                    final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-
-                    Graph<Long, Long, Long> graph = 
Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
-                            TestGraphUtils.getLongLongEdgeData(env), env);
-
-                    Graph<Long, Long, Long> result = 
graph.joinWithVertices(graph.getVertices().first(3)
-                                    .map(new MapFunction<Vertex<Long, Long>, 
Tuple2<Long, Long>>() {
-                                        @Override
-                                        public Tuple2<Long, Long> 
map(Vertex<Long, Long> vertex) throws Exception {
-                                            return new Tuple2<Long, 
Long>(vertex.getId(), vertex.getValue());
-                                        }
-                                    }),
-                            new MapFunction<Tuple2<Long, Long>, Long>() {
-
-                                @Override
-                                public Long map(Tuple2<Long, Long> tuple) 
throws Exception {
-                                    return tuple.f0 + tuple.f1;
-                                }
-                            });
-
-                    result.getVertices().writeAsCsv(resultPath);
-                    env.execute();
-
-                    return "1,2\n" +
-                            "2,4\n" +
-                            "3,6\n" +
-                            "4,4\n" +
-                            "5,5\n";
-                }
-                case 3: {
-                               /*
-                                * Test joinWithVertices with the input DataSet 
passed as a parameter containing
-                                * less elements than the vertex DataSet and of 
a different type(Boolean)
-                                */
-                    final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-
-                    Graph<Long, Long, Long> graph = 
Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
-                            TestGraphUtils.getLongLongEdgeData(env), env);
-
-                    Graph<Long, Long, Long> result = 
graph.joinWithVertices(graph.getVertices().first(3)
-                                    .map(new MapFunction<Vertex<Long, Long>, 
Tuple2<Long, Boolean>>() {
-                                        @Override
-                                        public Tuple2<Long, Boolean> 
map(Vertex<Long, Long> vertex) throws Exception {
-                                            return new Tuple2<Long, 
Boolean>(vertex.getId(), true);
-                                        }
-                                    }),
-                            new MapFunction<Tuple2<Long, Boolean>, Long>() {
-
-                                @Override
-                                public Long map(Tuple2<Long, Boolean> tuple) 
throws Exception {
-                                    if(tuple.f1) {
-                                        return tuple.f0 * 2;
-                                    }
-                                    else {
-                                        return tuple.f0;
-                                    }
-                                }
-                            });
-
-                    result.getVertices().writeAsCsv(resultPath);
-                    env.execute();
-
-                    return "1,2\n" +
-                            "2,4\n" +
-                            "3,6\n" +
-                            "4,4\n" +
-                            "5,5\n";
-                }
-                case 4: {
-                               /*
-                                * Test joinWithVertices with an input DataSet 
containing different keys than the vertex DataSet
-                                * - the iterator becomes empty.
-                                */
-                    final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-
-                    Graph<Long, Long, Long> graph = 
Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
-                            TestGraphUtils.getLongLongEdgeData(env), env);
-
-                    Graph<Long, Long, Long> result = 
graph.joinWithVertices(TestGraphUtils.getLongLongTuple2Data(env),
-                            new MapFunction<Tuple2<Long, Long>, Long>() {
-                                public Long map(Tuple2<Long, Long> tuple) 
throws Exception {
-                                    return tuple.f1;
-                                }
-                            });
-
-                    result.getVertices().writeAsCsv(resultPath);
-                    env.execute();
-
-                    return "1,10\n" +
-                            "2,20\n" +
-                            "3,30\n" +
-                            "4,40\n" +
-                            "5,5\n";
-                }
-                case 5: {
-                               /*
-                                * Test joinWithVertices with a DataSet 
containing custom parametrised type input values
-                                */
-                    final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-
-                    Graph<Long, Long, Long> graph = 
Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
-                            TestGraphUtils.getLongLongEdgeData(env), env);
-
-                    Graph<Long, Long, Long> result = 
graph.joinWithVertices(TestGraphUtils.getLongCustomTuple2Data(env),
-                            new MapFunction<Tuple2<Long, 
DummyCustomParameterizedType<Float>>, Long>() {
-                                public Long map(Tuple2<Long, 
DummyCustomParameterizedType<Float>> tuple) throws Exception {
-                                    return (long) tuple.f1.getIntField();
-                                }
-                            });
-
-                    result.getVertices().writeAsCsv(resultPath);
-                    env.execute();
-
-                    return "1,10\n" +
-                            "2,20\n" +
-                            "3,30\n" +
-                            "4,40\n" +
-                            "5,5\n";
-                }
-                default:
-                    throw new IllegalArgumentException("Invalid program id");
+       @SuppressWarnings("serial")
+       private static final class DoubleIfTrueMapper implements 
MapFunction<Tuple2<Long, Boolean>, Long> {
+        public Long map(Tuple2<Long, Boolean> tuple) throws Exception {
+            if(tuple.f1) {
+                return tuple.f0 * 2;
+            }
+            else {
+                return tuple.f0;
             }
         }
     }
+
+       @SuppressWarnings("serial")
+       private static final class ProjectSecondMapper implements 
MapFunction<Tuple2<Long, Long>, Long> {
+        public Long map(Tuple2<Long, Long> tuple) throws Exception {
+            return tuple.f1;
+        }
+    }
+
+       @SuppressWarnings("serial")
+       private static final class CustomValueMapper implements 
MapFunction<Tuple2<Long, DummyCustomParameterizedType<Float>>, Long> {
+        public Long map(Tuple2<Long, DummyCustomParameterizedType<Float>> 
tuple) throws Exception {
+            return (long) tuple.f1.getIntField();
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/e6b9cecd/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestMapEdges.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestMapEdges.java
 
b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestMapEdges.java
index bcba7e7..9eccecc 100644
--- 
a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestMapEdges.java
+++ 
b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestMapEdges.java
@@ -18,211 +18,206 @@
 
 package org.apache.flink.graph.test;
 
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.util.Collection;
-import java.util.LinkedList;
-
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.tuple.Tuple1;
-import org.apache.flink.configuration.Configuration;
 import org.apache.flink.graph.Edge;
 import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.test.TestGraphUtils.DummyCustomParameterizedType;
 import org.apache.flink.graph.test.TestGraphUtils.DummyCustomType;
-import org.apache.flink.test.util.JavaProgramTestBase;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
-import org.junit.runners.Parameterized.Parameters;
 
 @RunWith(Parameterized.class)
-public class TestMapEdges extends JavaProgramTestBase {
+public class TestMapEdges extends MultipleProgramsTestBase {
 
-       private static int NUM_PROGRAMS = 5;
-       
-       private int curProgId = config.getInteger("ProgramId", -1);
-       private String resultPath;
-       private String expectedResult;
-       
-       public TestMapEdges(Configuration config) {
-               super(config);
-       }
-       
-       @Override
-       protected void preSubmit() throws Exception {
-               resultPath = getTempDirPath("result");
+       public TestMapEdges(MultipleProgramsTestBase.ExecutionMode mode){
+               super(mode);
        }
 
-       @Override
-       protected void testProgram() throws Exception {
-               expectedResult = GraphProgs.runProgram(curProgId, resultPath);
+    private String resultPath;
+    private String expectedResult;
+
+    @Rule
+       public TemporaryFolder tempFolder = new TemporaryFolder();
+
+       @Before
+       public void before() throws Exception{
+               resultPath = tempFolder.newFile().toURI().toString();
        }
-       
-       @Override
-       protected void postSubmit() throws Exception {
+
+       @After
+       public void after() throws Exception{
                compareResultsByLinesInMemory(expectedResult, resultPath);
        }
-       
-       @Parameters
-       public static Collection<Object[]> getConfigurations() throws 
FileNotFoundException, IOException {
-
-               LinkedList<Configuration> tConfigs = new 
LinkedList<Configuration>();
 
-               for(int i=1; i <= NUM_PROGRAMS; i++) {
-                       Configuration config = new Configuration();
-                       config.setInteger("ProgramId", i);
-                       tConfigs.add(config);
-               }
+       @Test
+       public void testWithSameValue() throws Exception {
+               /*
+                * Test mapEdges() keeping the same value type
+                */
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
                
-               return toParameterList(tConfigs);
-       }
-       
-       private static class GraphProgs {
+               Graph<Long, Long, Long> graph = 
Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+                               TestGraphUtils.getLongLongEdgeData(env), env);
                
-               @SuppressWarnings("serial")
-               public static String runProgram(int progId, String resultPath) 
throws Exception {
-                       
-                       switch(progId) {
-                       case 1: {
-                               /*
-                                * Test mapEdges() keeping the same value type
-                                */
-                               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-                               
-                               Graph<Long, Long, Long> graph = 
Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
-                                               
TestGraphUtils.getLongLongEdgeData(env), env);
-                               
-                               DataSet<Edge<Long, Long>> mappedEdges = 
graph.mapEdges(new MapFunction<Edge<Long, Long>, Long>() {
-                                       public Long map(Edge<Long, Long> edge) 
throws Exception {
-                                               return edge.getValue()+1;
-                                       }
-                               }).getEdges();
-                               
-                               mappedEdges.writeAsCsv(resultPath);
-                               env.execute();
-                               return "1,2,13\n" +
+               DataSet<Edge<Long, Long>> mappedEdges = graph.mapEdges(new 
AddOneMapper()).getEdges();
+               
+               mappedEdges.writeAsCsv(resultPath);
+               env.execute();
+               expectedResult = "1,2,13\n" +
                                "1,3,14\n" +
                                "2,3,24\n" +
                                "3,4,35\n" +
                                "3,5,36\n" + 
                                "4,5,46\n" + 
                                "5,1,52\n";
-                       }
-                       case 2: {
-                               /*
-                                * Test mapEdges() and change the value type to 
String
-                                */
-                               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-                               
-                               Graph<Long, Long, Long> graph = 
Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
-                                               
TestGraphUtils.getLongLongEdgeData(env), env);
-                               
-                               DataSet<Edge<Long, String>> mappedEdges = 
graph.mapEdges(new MapFunction<Edge<Long, Long>, String>() {
-                                       public String map(Edge<Long, Long> 
edge) throws Exception {
-                                               return 
String.format("string(%d)", edge.getValue());
-                                       }
-                               }).getEdges();
-                               
-                               mappedEdges.writeAsCsv(resultPath);
-                               env.execute();
-                               return "1,2,string(12)\n" +
+       }
+
+       @Test
+       public void testWithStringValue() throws Exception {
+               /*
+                * Test mapEdges() and change the value type to String
+                */
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               
+               Graph<Long, Long, Long> graph = 
Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+                               TestGraphUtils.getLongLongEdgeData(env), env);
+               
+               DataSet<Edge<Long, String>> mappedEdges = graph.mapEdges(new 
ToStringMapper()).getEdges();
+               
+               mappedEdges.writeAsCsv(resultPath);
+               env.execute();
+               expectedResult = "1,2,string(12)\n" +
                                "1,3,string(13)\n" +
                                "2,3,string(23)\n" +
                                "3,4,string(34)\n" +
                                "3,5,string(35)\n" + 
                                "4,5,string(45)\n" + 
                                "5,1,string(51)\n";
-                       }
-                       case 3: {
-                               /*
-                                * Test mapEdges() and change the value type to 
a Tuple1
-                                */
-                               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-                               
-                               Graph<Long, Long, Long> graph = 
Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
-                                               
TestGraphUtils.getLongLongEdgeData(env), env);
-                               
-                               DataSet<Edge<Long, Tuple1<Long>>> mappedEdges = 
graph.mapEdges(new MapFunction<Edge<Long, Long>, 
-                                               Tuple1<Long>>() {
-                                       public Tuple1<Long> map(Edge<Long, 
Long> edge) throws Exception {
-                                               Tuple1<Long> tupleValue = new 
Tuple1<Long>();
-                                               
tupleValue.setFields(edge.getValue());
-                                               return tupleValue;
-                                       }
-                               }).getEdges();
-                               
-                               mappedEdges.writeAsCsv(resultPath);
-                               env.execute();
-                               return "1,2,(12)\n" +
+       }
+
+       @Test
+       public void testWithTuple1Type() throws Exception {
+               /*
+                * Test mapEdges() and change the value type to a Tuple1
+                */
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               
+               Graph<Long, Long, Long> graph = 
Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+                               TestGraphUtils.getLongLongEdgeData(env), env);
+               
+               DataSet<Edge<Long, Tuple1<Long>>> mappedEdges = 
graph.mapEdges(new ToTuple1Mapper()).getEdges();
+               
+               mappedEdges.writeAsCsv(resultPath);
+               env.execute();
+
+               expectedResult = "1,2,(12)\n" +
                                "1,3,(13)\n" +
                                "2,3,(23)\n" +
                                "3,4,(34)\n" +
                                "3,5,(35)\n" + 
                                "4,5,(45)\n" + 
                                "5,1,(51)\n";
-                       }
-                       case 4: {
-                               /*
-                                * Test mapEdges() and change the value type to 
a custom type
-                                */
-                               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-                               
-                               Graph<Long, Long, Long> graph = 
Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
-                                               
TestGraphUtils.getLongLongEdgeData(env), env);
-                               
-                               DataSet<Edge<Long, DummyCustomType>> 
mappedEdges = graph.mapEdges(new MapFunction<Edge<Long, Long>, 
-                                               DummyCustomType>() {
-                                       public DummyCustomType map(Edge<Long, 
Long> edge) throws Exception {
-                                               DummyCustomType dummyValue = 
new DummyCustomType();
-                                               
dummyValue.setIntField(edge.getValue().intValue());                             
                
-                                               return dummyValue;
-                                       }
-                               }).getEdges();
-                               
-                               mappedEdges.writeAsCsv(resultPath);
-                               env.execute();
-                               return "1,2,(T,12)\n" +
-                               "1,3,(T,13)\n" +
-                               "2,3,(T,23)\n" +
-                               "3,4,(T,34)\n" +
-                               "3,5,(T,35)\n" + 
-                               "4,5,(T,45)\n" + 
-                               "5,1,(T,51)\n";
-                       }
-                       case 5: {
-                               /*
-                                * Test mapEdges() and change the value type to 
a parameterized custom type
-                                */
-                               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-                               
-                               Graph<Long, Long, Long> graph = 
Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
-                                               
TestGraphUtils.getLongLongEdgeData(env), env);
-                               
-                               DataSet<Edge<Long, 
DummyCustomParameterizedType<Double>>> mappedEdges = graph.mapEdges(
-                                               new MapFunction<Edge<Long, 
Long>, DummyCustomParameterizedType<Double>>() {
-                                       public 
DummyCustomParameterizedType<Double> map(Edge<Long, Long> edge) throws 
Exception {
-                                               
DummyCustomParameterizedType<Double> dummyValue = new 
DummyCustomParameterizedType<Double>();
-                                               
dummyValue.setIntField(edge.getValue().intValue());
-                                               dummyValue.setTField(new 
Double(edge.getValue()));                                              
-                                               return dummyValue;
-                                       }
-                               }).getEdges();
-                               
-                               mappedEdges.writeAsCsv(resultPath);
-                               env.execute();
-                               return "1,2,(12.0,12)\n" +
-                               "1,3,(13.0,13)\n" +
-                               "2,3,(23.0,23)\n" +
-                               "3,4,(34.0,34)\n" +
-                               "3,5,(35.0,35)\n" + 
-                               "4,5,(45.0,45)\n" + 
-                               "5,1,(51.0,51)\n";
-                       }
-                       default: 
-                               throw new IllegalArgumentException("Invalid 
program id");
-                       }
+       }
+
+       @Test
+       public void testWithCustomType() throws Exception {
+               /*
+                * Test mapEdges() and change the value type to a custom type
+                */
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               
+               Graph<Long, Long, Long> graph = 
Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+                               TestGraphUtils.getLongLongEdgeData(env), env);
+               
+               DataSet<Edge<Long, DummyCustomType>> mappedEdges = 
graph.mapEdges(new ToCustomTypeMapper()).getEdges();
+               
+               mappedEdges.writeAsCsv(resultPath);
+               env.execute();
+
+               expectedResult = "1,2,(T,12)\n" +
+                       "1,3,(T,13)\n" +
+                       "2,3,(T,23)\n" +
+                       "3,4,(T,34)\n" +
+                       "3,5,(T,35)\n" + 
+                       "4,5,(T,45)\n" + 
+                       "5,1,(T,51)\n";
+       }
+
+       @Test
+       public void testWithParametrizedCustomType() throws Exception {
+               /*
+                * Test mapEdges() and change the value type to a parameterized 
custom type
+                */
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               
+               Graph<Long, Long, Long> graph = 
Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+                               TestGraphUtils.getLongLongEdgeData(env), env);
+               
+               DataSet<Edge<Long, DummyCustomParameterizedType<Double>>> 
mappedEdges = graph.mapEdges(
+                               new 
ToCustomParametrizedTypeMapper()).getEdges();
+               
+               mappedEdges.writeAsCsv(resultPath);
+               env.execute();
+       
+               expectedResult = "1,2,(12.0,12)\n" +
+                       "1,3,(13.0,13)\n" +
+                       "2,3,(23.0,23)\n" +
+                       "3,4,(34.0,34)\n" +
+                       "3,5,(35.0,35)\n" + 
+                       "4,5,(45.0,45)\n" + 
+                       "5,1,(51.0,51)\n";
+       }
+
+       @SuppressWarnings("serial")
+       private static final class AddOneMapper implements 
MapFunction<Edge<Long, Long>, Long> {
+               public Long map(Edge<Long, Long> edge) throws Exception {
+                       return edge.getValue()+1;
+               }
+       }
+
+       @SuppressWarnings("serial")
+       private static final class ToStringMapper implements 
MapFunction<Edge<Long, Long>, String> {
+               public String map(Edge<Long, Long> edge) throws Exception {
+                       return String.format("string(%d)", edge.getValue());
+               }
+       }
+
+       @SuppressWarnings("serial")
+       private static final class ToTuple1Mapper implements 
MapFunction<Edge<Long, Long>, Tuple1<Long>> {
+               public Tuple1<Long> map(Edge<Long, Long> edge) throws Exception 
{
+                       Tuple1<Long> tupleValue = new Tuple1<Long>();
+                       tupleValue.setFields(edge.getValue());
+                       return tupleValue;
+               }
+       }
+
+       @SuppressWarnings("serial")
+       private static final class ToCustomTypeMapper implements 
MapFunction<Edge<Long, Long>, DummyCustomType> {
+               public DummyCustomType map(Edge<Long, Long> edge) throws 
Exception {
+                       DummyCustomType dummyValue = new DummyCustomType();
+                       dummyValue.setIntField(edge.getValue().intValue());     
                                        
+                       return dummyValue;
+               }
+       }
+
+       @SuppressWarnings("serial")
+       private static final class ToCustomParametrizedTypeMapper implements 
MapFunction<Edge<Long, Long>, 
+               DummyCustomParameterizedType<Double>> {
+
+               public DummyCustomParameterizedType<Double> map(Edge<Long, 
Long> edge) throws Exception {
+                       DummyCustomParameterizedType<Double> dummyValue = new 
DummyCustomParameterizedType<Double>();
+                       dummyValue.setIntField(edge.getValue().intValue());
+                       dummyValue.setTField(new Double(edge.getValue()));      
                                        
+                       return dummyValue;
                }
        }
-}
+}
\ No newline at end of file

Reply via email to