http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/a0f77857/flink-tests/src/test/java/org/apache/flink/test/exampleScalaPrograms/PageRankITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/exampleScalaPrograms/PageRankITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/exampleScalaPrograms/PageRankITCase.java
index bc026c9..6fe549f 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/exampleScalaPrograms/PageRankITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/exampleScalaPrograms/PageRankITCase.java
@@ -18,82 +18,68 @@
 
 package org.apache.flink.test.exampleScalaPrograms;
 
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.util.Collection;
-import java.util.LinkedList;
+import java.io.File;
 
-import org.apache.flink.configuration.Configuration;
+import com.google.common.base.Charsets;
+import com.google.common.io.Files;
 import org.apache.flink.examples.scala.graph.PageRankBasic;
 import org.apache.flink.test.testdata.PageRankData;
-import org.apache.flink.test.util.JavaProgramTestBase;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
-import org.junit.runners.Parameterized.Parameters;
 
 @RunWith(Parameterized.class)
-public class PageRankITCase extends JavaProgramTestBase {
-       
-       private static int NUM_PROGRAMS = 2;
-       
-       private int curProgId = config.getInteger("ProgramId", -1);
-       
+public class PageRankITCase extends MultipleProgramsTestBase {
+
+       public PageRankITCase(ExecutionMode mode){
+               super(mode);
+       }
+
        private String verticesPath;
        private String edgesPath;
        private String resultPath;
-       private String expectedResult;
-       
-       public PageRankITCase(Configuration config) {
-               super(config);
-       }
-       
-       @Override
-       protected void preSubmit() throws Exception {
-               resultPath = getTempDirPath("result");
-               verticesPath = createTempFile("vertices.txt", 
PageRankData.VERTICES);
-               edgesPath = createTempFile("edges.txt", PageRankData.EDGES);
-       }
+       private String expected;
 
-       @Override
-       protected void testProgram() throws Exception {
-               expectedResult = runProgram(curProgId);
-       }
-       
-       @Override
-       protected void postSubmit() throws Exception {
-               compareKeyValueParisWithDelta(expectedResult, resultPath, " ", 
0.01);
+       @Rule
+       public TemporaryFolder tempFolder = new TemporaryFolder();
+
+       @Before
+       public void before() throws Exception{
+               File resultFile = tempFolder.newFile();
+               //Delete file because the Scala API does not respect WriteMode 
set by the configuration
+               resultFile.delete();
+               resultPath = resultFile.toURI().toString();
+
+               File verticesFile = tempFolder.newFile();
+               Files.write(PageRankData.VERTICES, verticesFile, 
Charsets.UTF_8);
+
+               File edgesFile = tempFolder.newFile();
+               Files.write(PageRankData.EDGES, edgesFile, Charsets.UTF_8);
+
+               verticesPath = verticesFile.toURI().toString();
+               edgesPath = edgesFile.toURI().toString();
        }
-       
-       @Parameters
-       public static Collection<Object[]> getConfigurations() throws 
FileNotFoundException, IOException {
 
-               LinkedList<Configuration> tConfigs = new 
LinkedList<Configuration>();
+       @After
+       public void after() throws Exception{
+               compareKeyValueParisWithDelta(expected, resultPath, " ", 0.01);
+       }
 
-               for(int i=1; i <= NUM_PROGRAMS; i++) {
-                       Configuration config = new Configuration();
-                       config.setInteger("ProgramId", i);
-                       tConfigs.add(config);
-               }
-               
-               return toParameterList(tConfigs);
+       @Test
+       public void testPageRankWithSmallNumberOfIterations() throws Exception {
+               PageRankBasic.main(new String[] {verticesPath, edgesPath, 
resultPath, PageRankData.NUM_VERTICES+"", "3"});
+               expected = PageRankData.RANKS_AFTER_3_ITERATIONS;
        }
-       
 
-       public String runProgram(int progId) throws Exception {
-               
-               switch(progId) {
-               case 1: {
-                       PageRankBasic.main(new String[] {verticesPath, 
edgesPath, resultPath, PageRankData.NUM_VERTICES+"", "3"});
-                       return PageRankData.RANKS_AFTER_3_ITERATIONS;
-               }
-               case 2: {
-                       // start with a very high number of iteration such that 
the dynamic convergence criterion must handle termination
-                       PageRankBasic.main(new String[] {verticesPath, 
edgesPath, resultPath, PageRankData.NUM_VERTICES+"", "1000"});
-                       return 
PageRankData.RANKS_AFTER_EPSILON_0_0001_CONVERGENCE;
-               }
-               
-               default: 
-                       throw new IllegalArgumentException("Invalid program 
id");
-               }
+       @Test
+       public void testPageRankWithConvergence() throws Exception {
+               // start with a very high number of iteration such that the 
dynamic convergence criterion must handle termination
+               PageRankBasic.main(new String[] {verticesPath, edgesPath, 
resultPath, PageRankData.NUM_VERTICES+"", "1000"});
+               expected = PageRankData.RANKS_AFTER_EPSILON_0_0001_CONVERGENCE;
        }
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/a0f77857/flink-tests/src/test/java/org/apache/flink/test/iterative/aggregators/AggregatorsITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/iterative/aggregators/AggregatorsITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/iterative/aggregators/AggregatorsITCase.java
index 674ca49..aae7168 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/iterative/aggregators/AggregatorsITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/iterative/aggregators/AggregatorsITCase.java
@@ -18,12 +18,10 @@
 
 package org.apache.flink.test.iterative.aggregators;
 
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.util.Collection;
-import java.util.LinkedList;
 import java.util.Random;
 
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.junit.After;
 import org.junit.Assert;
 
 import org.apache.flink.api.common.aggregators.ConvergenceCriterion;
@@ -33,12 +31,14 @@ import 
org.apache.flink.api.common.functions.RichMapFunction;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
-import org.apache.flink.test.util.JavaProgramTestBase;
 import org.apache.flink.types.LongValue;
 import org.apache.flink.util.Collector;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
-import org.junit.runners.Parameterized.Parameters;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.operators.DeltaIteration;
 import org.apache.flink.api.java.ExecutionEnvironment;
@@ -49,213 +49,185 @@ import 
org.apache.flink.api.java.operators.IterativeDataSet;
  *
  */
 @RunWith(Parameterized.class)
-public class AggregatorsITCase extends JavaProgramTestBase {
+public class AggregatorsITCase extends MultipleProgramsTestBase {
 
-       private static final int NUM_PROGRAMS = 5;
-       private static final int MAX_ITERATIONS = 20;   
+       private static final int MAX_ITERATIONS = 20;
        private static final int DOP = 2;
+       private static final String NEGATIVE_ELEMENTS_AGGR = 
"count.negative.elements";
+
+       public AggregatorsITCase(ExecutionMode mode){
+               super(mode);
+       }
 
-       private int curProgId = config.getInteger("ProgramId", -1);
        private String resultPath;
-       private String expectedResult;
+       private String expected;
 
-       public AggregatorsITCase(Configuration config) {
-               super(config);
-       }
+       @Rule
+       public TemporaryFolder tempFolder = new TemporaryFolder();
 
-       @Override
-       protected void preSubmit() throws Exception {
-               resultPath = getTempDirPath("result");
+       @Before
+       public void before() throws Exception{
+               resultPath = tempFolder.newFile().toURI().toString();
        }
 
-       @Override
-       protected void testProgram() throws Exception {
-               expectedResult = AggregatorProgs.runProgram(curProgId, 
resultPath);
+       @After
+       public void after() throws Exception{
+               compareResultsByLinesInMemory(expected, resultPath);
        }
 
-       @Override
-       protected void postSubmit() throws Exception {
+       @Test
+       public void testAggregatorWithoutParameterForIterate() throws Exception 
{
+               /*
+                * Test aggregator without parameter for iterate
+                */
 
-               compareResultsByLinesInMemory(expectedResult, resultPath);
-       }
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               env.setDegreeOfParallelism(DOP);
 
-       @Parameters
-       public static Collection<Object[]> getConfigurations() throws 
FileNotFoundException, IOException {
+               DataSet<Integer> initialSolutionSet = 
CollectionDataSets.getIntegerDataSet(env);
+               IterativeDataSet<Integer> iteration = 
initialSolutionSet.iterate(MAX_ITERATIONS);
 
-               LinkedList<Configuration> tConfigs = new 
LinkedList<Configuration>();
+               // register aggregator
+               LongSumAggregator aggr = new LongSumAggregator();
+               iteration.registerAggregator(NEGATIVE_ELEMENTS_AGGR, aggr);
 
-               for(int i=1; i <= NUM_PROGRAMS; i++) {
-                       Configuration config = new Configuration();
-                       config.setInteger("ProgramId", i);
-                       tConfigs.add(config);
-               }
+               // register convergence criterion
+               
iteration.registerAggregationConvergenceCriterion(NEGATIVE_ELEMENTS_AGGR, aggr,
+                               new NegativeElementsConvergenceCriterion());
+
+               DataSet<Integer> updatedDs = iteration.map(new 
SubtractOneMap());
+               iteration.closeWith(updatedDs).writeAsText(resultPath);
+               env.execute();
 
-               return toParameterList(tConfigs);
+               expected =  "-3\n" + "-2\n" + "-2\n" + "-1\n" + "-1\n"
+                               + "-1\n" + "0\n" + "0\n" + "0\n" + "0\n"
+                               + "1\n" + "1\n" + "1\n" + "1\n" + "1\n";
        }
 
-       private static class AggregatorProgs {
+       @Test
+       public void testAggregatorWithParameterForIterate() throws Exception {
+               /*
+                * Test aggregator with parameter for iterate
+                */
 
-               private static final String NEGATIVE_ELEMENTS_AGGR = 
"count.negative.elements";
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               env.setDegreeOfParallelism(DOP);
 
-               public static String runProgram(int progId, String resultPath) 
throws Exception {
+               DataSet<Integer> initialSolutionSet = 
CollectionDataSets.getIntegerDataSet(env);
+               IterativeDataSet<Integer> iteration = 
initialSolutionSet.iterate(MAX_ITERATIONS);
 
-                       switch(progId) {
-                       case 1: {
-                               /*
-                                * Test aggregator without parameter for iterate
-                                */
+               // register aggregator
+               LongSumAggregatorWithParameter aggr = new 
LongSumAggregatorWithParameter(0);
+               iteration.registerAggregator(NEGATIVE_ELEMENTS_AGGR, aggr);
 
-                               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-                               env.setDegreeOfParallelism(DOP);
-
-                               DataSet<Integer> initialSolutionSet = 
CollectionDataSets.getIntegerDataSet(env);
-                               IterativeDataSet<Integer> iteration = 
initialSolutionSet.iterate(MAX_ITERATIONS);
-
-                               // register aggregator
-                               LongSumAggregator aggr = new 
LongSumAggregator();
-                               
iteration.registerAggregator(NEGATIVE_ELEMENTS_AGGR, aggr);
-                               
-                               // register convergence criterion
-                               
iteration.registerAggregationConvergenceCriterion(NEGATIVE_ELEMENTS_AGGR, aggr, 
-                                               new 
NegativeElementsConvergenceCriterion());
-                               
-                               DataSet<Integer> updatedDs = iteration.map(new 
SubtractOneMap());
-                               
iteration.closeWith(updatedDs).writeAsText(resultPath);
-                               env.execute();
-
-                               // return expected result
-                               return "-3\n" + "-2\n" + "-2\n" + "-1\n" + 
"-1\n"
-                                               + "-1\n" + "0\n" + "0\n" + 
"0\n" + "0\n"
-                                               + "1\n" + "1\n" + "1\n" + "1\n" 
+ "1\n";
-                       }
-                       case 2: {
-                               /*
-                                * Test aggregator with parameter for iterate
-                                */
-                               
-                               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-                               env.setDegreeOfParallelism(DOP);
-
-                               DataSet<Integer> initialSolutionSet = 
CollectionDataSets.getIntegerDataSet(env);
-                               IterativeDataSet<Integer> iteration = 
initialSolutionSet.iterate(MAX_ITERATIONS);
-
-                               // register aggregator
-                               LongSumAggregatorWithParameter aggr = new 
LongSumAggregatorWithParameter(0);
-                               
iteration.registerAggregator(NEGATIVE_ELEMENTS_AGGR, aggr);
-                               
-                               // register convergence criterion
-                               
iteration.registerAggregationConvergenceCriterion(NEGATIVE_ELEMENTS_AGGR, aggr, 
-                                               new 
NegativeElementsConvergenceCriterion());
-                               
-                               DataSet<Integer> updatedDs = iteration.map(new 
SubtractOneMapWithParam());
-                               
iteration.closeWith(updatedDs).writeAsText(resultPath);
-                               env.execute();
-                               
-                               // return expected result
-                               return "-3\n" + "-2\n" + "-2\n" + "-1\n" + 
"-1\n"
-                                               + "-1\n" + "0\n" + "0\n" + 
"0\n" + "0\n"
-                                               + "1\n" + "1\n" + "1\n" + "1\n" 
+ "1\n";
-                       }
-                       case 3: {
-                               /*
+               // register convergence criterion
+               
iteration.registerAggregationConvergenceCriterion(NEGATIVE_ELEMENTS_AGGR, aggr,
+                               new NegativeElementsConvergenceCriterion());
+
+               DataSet<Integer> updatedDs = iteration.map(new 
SubtractOneMapWithParam());
+               iteration.closeWith(updatedDs).writeAsText(resultPath);
+               env.execute();
+
+               expected =  "-3\n" + "-2\n" + "-2\n" + "-1\n" + "-1\n"
+                               + "-1\n" + "0\n" + "0\n" + "0\n" + "0\n"
+                               + "1\n" + "1\n" + "1\n" + "1\n" + "1\n";
+       }
+
+       @Test
+       public void testConvergenceCriterionWithParameterForIterate() throws 
Exception {
+               /*
                                 * Test convergence criterion with parameter 
for iterate
                                 */
-                               
-                               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-                               env.setDegreeOfParallelism(DOP);
-
-                               DataSet<Integer> initialSolutionSet = 
CollectionDataSets.getIntegerDataSet(env);
-                               IterativeDataSet<Integer> iteration = 
initialSolutionSet.iterate(MAX_ITERATIONS);
-
-                               // register aggregator
-                               LongSumAggregator aggr = new 
LongSumAggregator();
-                               
iteration.registerAggregator(NEGATIVE_ELEMENTS_AGGR, aggr);
-                               
-                               // register convergence criterion
-                               
iteration.registerAggregationConvergenceCriterion(NEGATIVE_ELEMENTS_AGGR, aggr, 
-                                               new 
NegativeElementsConvergenceCriterionWithParam(3));
-                               
-                               DataSet<Integer> updatedDs = iteration.map(new 
SubtractOneMap());
-                               
iteration.closeWith(updatedDs).writeAsText(resultPath);
-                               env.execute();
-                               
-                               // return expected result
-                               return "-3\n" + "-2\n" + "-2\n" + "-1\n" + 
"-1\n"
-                                               + "-1\n" + "0\n" + "0\n" + 
"0\n" + "0\n"
-                                               + "1\n" + "1\n" + "1\n" + "1\n" 
+ "1\n";
-                       }
-                       case 4: {
-                               /*
-                                * Test aggregator without parameter for 
iterateDelta
-                                */
-                               
-                               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-                               env.setDegreeOfParallelism(DOP);
-                               
-                               DataSet<Tuple2<Integer, Integer>> 
initialSolutionSet = CollectionDataSets.getIntegerDataSet(env).map(new 
TupleMakerMap());
-                                               
-                               DeltaIteration<Tuple2<Integer, Integer>, 
Tuple2<Integer, Integer>> iteration = initialSolutionSet.iterateDelta(
-                                               initialSolutionSet, 
MAX_ITERATIONS, 0);
-
-                               // register aggregator
-                               LongSumAggregator aggr = new 
LongSumAggregator();
-                               
iteration.registerAggregator(NEGATIVE_ELEMENTS_AGGR, aggr);
-                               
-                               DataSet<Tuple2<Integer, Integer>> updatedDs = 
iteration.getWorkset().map(new AggregateMapDelta());
-                               
-                               DataSet<Tuple2<Integer, Integer>> newElements = 
updatedDs.join(iteration.getSolutionSet())
-                                               
.where(0).equalTo(0).flatMap(new UpdateFilter());
-                               
-                               DataSet<Tuple2<Integer, Integer>> iterationRes 
= iteration.closeWith(newElements, newElements);
-                               DataSet<Integer> result = iterationRes.map(new 
ProjectSecondMapper());
-                               result.writeAsText(resultPath);
-                               
-                               env.execute();
-                               
-                               // return expected result
-                               return "1\n" + "2\n" + "2\n" + "3\n" + "3\n"
-                                               + "3\n" + "4\n" + "4\n" + "4\n" 
+ "4\n"
-                                               + "5\n" + "5\n" + "5\n" + "5\n" 
+ "5\n";
-                               
-                       }
-                       case 5: {
-                               /*
-                                * Test aggregator with parameter for 
iterateDelta
-                                */
-                               
-                               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-                               env.setDegreeOfParallelism(DOP);
-                               
-                               DataSet<Tuple2<Integer, Integer>> 
initialSolutionSet = CollectionDataSets.getIntegerDataSet(env).map(new 
TupleMakerMap());
-                                               
-                               DeltaIteration<Tuple2<Integer, Integer>, 
Tuple2<Integer, Integer>> iteration = initialSolutionSet.iterateDelta(
-                                               initialSolutionSet, 
MAX_ITERATIONS, 0);
-
-                               // register aggregator
-                               LongSumAggregator aggr = new 
LongSumAggregatorWithParameter(4);
-                               
iteration.registerAggregator(NEGATIVE_ELEMENTS_AGGR, aggr);
-                               
-                               DataSet<Tuple2<Integer, Integer>> updatedDs = 
iteration.getWorkset().map(new AggregateMapDelta());
-                               
-                               DataSet<Tuple2<Integer, Integer>> newElements = 
updatedDs.join(iteration.getSolutionSet())
-                                               
.where(0).equalTo(0).flatMap(new UpdateFilter());
-                               
-                               DataSet<Tuple2<Integer, Integer>> iterationRes 
= iteration.closeWith(newElements, newElements);
-                               DataSet<Integer> result = iterationRes.map(new 
ProjectSecondMapper());
-                               result.writeAsText(resultPath);
-                               
-                               env.execute();
-                               
-                               // return expected result
-                               return "1\n" + "2\n" + "2\n" + "3\n" + "3\n"
-                                               + "3\n" + "4\n" + "4\n" + "4\n" 
+ "4\n"
-                                               + "5\n" + "5\n" + "5\n" + "5\n" 
+ "5\n";
-                       }
-                       default:
-                               throw new IllegalArgumentException("Invalid 
program id");
-                       }
 
-               }
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               env.setDegreeOfParallelism(DOP);
+
+               DataSet<Integer> initialSolutionSet = 
CollectionDataSets.getIntegerDataSet(env);
+               IterativeDataSet<Integer> iteration = 
initialSolutionSet.iterate(MAX_ITERATIONS);
+
+               // register aggregator
+               LongSumAggregator aggr = new LongSumAggregator();
+               iteration.registerAggregator(NEGATIVE_ELEMENTS_AGGR, aggr);
+
+               // register convergence criterion
+               
iteration.registerAggregationConvergenceCriterion(NEGATIVE_ELEMENTS_AGGR, aggr,
+                               new 
NegativeElementsConvergenceCriterionWithParam(3));
+
+               DataSet<Integer> updatedDs = iteration.map(new 
SubtractOneMap());
+               iteration.closeWith(updatedDs).writeAsText(resultPath);
+               env.execute();
+
+               expected = "-3\n" + "-2\n" + "-2\n" + "-1\n" + "-1\n"
+                               + "-1\n" + "0\n" + "0\n" + "0\n" + "0\n"
+                               + "1\n" + "1\n" + "1\n" + "1\n" + "1\n";
+       }
+
+       @Test
+       public void testAggregatorWithoutParameterForIterateDelta() throws 
Exception {
+               /*
+                * Test aggregator without parameter for iterateDelta
+                */
+
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               env.setDegreeOfParallelism(DOP);
+
+               DataSet<Tuple2<Integer, Integer>> initialSolutionSet = 
CollectionDataSets.getIntegerDataSet(env).map(new TupleMakerMap());
+
+               DeltaIteration<Tuple2<Integer, Integer>, Tuple2<Integer, 
Integer>> iteration = initialSolutionSet.iterateDelta(
+                               initialSolutionSet, MAX_ITERATIONS, 0);
+
+               // register aggregator
+               LongSumAggregator aggr = new LongSumAggregator();
+               iteration.registerAggregator(NEGATIVE_ELEMENTS_AGGR, aggr);
+
+               DataSet<Tuple2<Integer, Integer>> updatedDs = 
iteration.getWorkset().map(new AggregateMapDelta());
+
+               DataSet<Tuple2<Integer, Integer>> newElements = 
updatedDs.join(iteration.getSolutionSet())
+                               .where(0).equalTo(0).flatMap(new 
UpdateFilter());
+
+               DataSet<Tuple2<Integer, Integer>> iterationRes = 
iteration.closeWith(newElements, newElements);
+               DataSet<Integer> result = iterationRes.map(new 
ProjectSecondMapper());
+               result.writeAsText(resultPath);
+
+               env.execute();
+
+               expected = "1\n" + "2\n" + "2\n" + "3\n" + "3\n"
+                               + "3\n" + "4\n" + "4\n" + "4\n" + "4\n"
+                               + "5\n" + "5\n" + "5\n" + "5\n" + "5\n";
+       }
+
+       @Test
+       public void testAggregatorWithParameterForIterateDelta() throws 
Exception {
+               /*
+                * Test aggregator with parameter for iterateDelta
+                */
+
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               env.setDegreeOfParallelism(DOP);
+
+               DataSet<Tuple2<Integer, Integer>> initialSolutionSet = 
CollectionDataSets.getIntegerDataSet(env).map(new TupleMakerMap());
+
+               DeltaIteration<Tuple2<Integer, Integer>, Tuple2<Integer, 
Integer>> iteration = initialSolutionSet.iterateDelta(
+                               initialSolutionSet, MAX_ITERATIONS, 0);
+
+               // register aggregator
+               LongSumAggregator aggr = new LongSumAggregatorWithParameter(4);
+               iteration.registerAggregator(NEGATIVE_ELEMENTS_AGGR, aggr);
+
+               DataSet<Tuple2<Integer, Integer>> updatedDs = 
iteration.getWorkset().map(new AggregateMapDelta());
+
+               DataSet<Tuple2<Integer, Integer>> newElements = 
updatedDs.join(iteration.getSolutionSet())
+                               .where(0).equalTo(0).flatMap(new 
UpdateFilter());
+
+               DataSet<Tuple2<Integer, Integer>> iterationRes = 
iteration.closeWith(newElements, newElements);
+               DataSet<Integer> result = iterationRes.map(new 
ProjectSecondMapper());
+               result.writeAsText(resultPath);
+
+               env.execute();
+
+               expected = "1\n" + "2\n" + "2\n" + "3\n" + "3\n"
+                               + "3\n" + "4\n" + "4\n" + "4\n" + "4\n"
+                               + "5\n" + "5\n" + "5\n" + "5\n" + "5\n";
        }
 
        @SuppressWarnings("serial")
@@ -294,7 +266,7 @@ public class AggregatorsITCase extends JavaProgramTestBase {
                @Override
                public void open(Configuration conf) {
 
-                       aggr = 
getIterationRuntimeContext().getIterationAggregator(AggregatorProgs.NEGATIVE_ELEMENTS_AGGR);
+                       aggr = 
getIterationRuntimeContext().getIterationAggregator(NEGATIVE_ELEMENTS_AGGR);
                }
 
                @Override
@@ -316,7 +288,7 @@ public class AggregatorsITCase extends JavaProgramTestBase {
                @Override
                public void open(Configuration conf) {
 
-                       aggr = 
getIterationRuntimeContext().getIterationAggregator(AggregatorProgs.NEGATIVE_ELEMENTS_AGGR);
+                       aggr = 
getIterationRuntimeContext().getIterationAggregator(NEGATIVE_ELEMENTS_AGGR);
                }
 
                @Override
@@ -366,11 +338,11 @@ public class AggregatorsITCase extends 
JavaProgramTestBase {
                @Override
                public void open(Configuration conf) {
 
-                       aggr = 
getIterationRuntimeContext().getIterationAggregator(AggregatorProgs.NEGATIVE_ELEMENTS_AGGR);
+                       aggr = 
getIterationRuntimeContext().getIterationAggregator(NEGATIVE_ELEMENTS_AGGR);
                        superstep = 
getIterationRuntimeContext().getSuperstepNumber();
 
                        if (superstep > 1) {
-                               previousAggr = 
getIterationRuntimeContext().getPreviousIterationAggregate(AggregatorProgs.NEGATIVE_ELEMENTS_AGGR);
+                               previousAggr = 
getIterationRuntimeContext().getPreviousIterationAggregate(NEGATIVE_ELEMENTS_AGGR);
                                // check previous aggregator value
                                Assert.assertEquals(superstep - 1, 
previousAggr.getValue());
                        }
@@ -429,11 +401,11 @@ public class AggregatorsITCase extends 
JavaProgramTestBase {
                @Override
                public void open(Configuration conf) {
 
-                       aggr = 
getIterationRuntimeContext().getIterationAggregator(AggregatorProgs.NEGATIVE_ELEMENTS_AGGR);
+                       aggr = 
getIterationRuntimeContext().getIterationAggregator(NEGATIVE_ELEMENTS_AGGR);
                        superstep = 
getIterationRuntimeContext().getSuperstepNumber();
 
                        if (superstep > 1) {
-                               previousAggr = 
getIterationRuntimeContext().getPreviousIterationAggregate(AggregatorProgs.NEGATIVE_ELEMENTS_AGGR);
+                               previousAggr = 
getIterationRuntimeContext().getPreviousIterationAggregate(NEGATIVE_ELEMENTS_AGGR);
 
                                // check previous aggregator value
                                switch(superstep) {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/a0f77857/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/AggregateITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/AggregateITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/AggregateITCase.java
index 61ad863..3fbcae6 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/AggregateITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/AggregateITCase.java
@@ -18,135 +18,107 @@
 
 package org.apache.flink.test.javaApiOperators;
 
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.util.Collection;
-import java.util.LinkedList;
-
 import org.apache.flink.api.java.aggregation.Aggregations;
 import org.apache.flink.api.java.tuple.Tuple1;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.configuration.Configuration;
 import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
-import org.apache.flink.test.util.JavaProgramTestBase;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
-import org.junit.runners.Parameterized.Parameters;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 
 @RunWith(Parameterized.class)
-public class AggregateITCase extends JavaProgramTestBase {
-       
-       private static int NUM_PROGRAMS = 3;
-       
-       private int curProgId = config.getInteger("ProgramId", -1);
-       private String resultPath;
-       private String expectedResult;
-       
-       public AggregateITCase(Configuration config) {
-               super(config);
-       }
-       
-       @Override
-       protected void preSubmit() throws Exception {
-               resultPath = getTempDirPath("result");
-       }
+public class AggregateITCase extends MultipleProgramsTestBase {
 
-       @Override
-       protected void testProgram() throws Exception {
-               expectedResult = AggregateProgs.runProgram(curProgId, 
resultPath);
+
+       public AggregateITCase(ExecutionMode mode){
+               super(mode);
        }
-       
-       @Override
-       protected void postSubmit() throws Exception {
-               compareResultsByLinesInMemory(expectedResult, resultPath);
+
+       private String resultPath;
+       private String expected;
+
+       @Rule
+       public TemporaryFolder tempFolder = new TemporaryFolder();
+
+       @Before
+       public void before() throws Exception{
+               resultPath = tempFolder.newFile().toURI().toString();
        }
-       
-       @Parameters
-       public static Collection<Object[]> getConfigurations() throws 
FileNotFoundException, IOException {
-
-               LinkedList<Configuration> tConfigs = new 
LinkedList<Configuration>();
-
-               for(int i=1; i <= NUM_PROGRAMS; i++) {
-                       Configuration config = new Configuration();
-                       config.setInteger("ProgramId", i);
-                       tConfigs.add(config);
-               }
-               
-               return toParameterList(tConfigs);
+
+       @After
+       public void after() throws Exception{
+               compareResultsByLinesInMemory(expected, resultPath);
        }
-       
-       private static class AggregateProgs {
-               
-               public static String runProgram(int progId, String resultPath) 
throws Exception {
-                       
-                       switch(progId) {
-                       case 1: {
-                               /*
+
+       @Test
+       public void testFullAggregate() throws Exception {
+               /*
                                 * Full Aggregate
                                 */
-                               
-                               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-                               
-                               DataSet<Tuple3<Integer, Long, String>> ds = 
CollectionDataSets.get3TupleDataSet(env);
-                               DataSet<Tuple2<Integer, Long>> aggregateDs = ds
-                                               .aggregate(Aggregations.SUM, 0)
-                                               .and(Aggregations.MAX, 1)
+
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               DataSet<Tuple3<Integer, Long, String>> ds = 
CollectionDataSets.get3TupleDataSet(env);
+               DataSet<Tuple2<Integer, Long>> aggregateDs = ds
+                               .aggregate(Aggregations.SUM, 0)
+                               .and(Aggregations.MAX, 1)
                                                .project(0, 1);
-                               
-                               aggregateDs.writeAsCsv(resultPath);
-                               env.execute();
-                               
-                               // return expected result
-                               return "231,6\n";
-                       }
-                       case 2: {
-                               /*
+
+               aggregateDs.writeAsCsv(resultPath);
+               env.execute();
+
+               expected = "231,6\n";
+       }
+
+       @Test
+       public void testGroupedAggregate() throws Exception {
+               /*
                                 * Grouped Aggregate
                                 */
-                               
-                               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-                               
-                               DataSet<Tuple3<Integer, Long, String>> ds = 
CollectionDataSets.get3TupleDataSet(env);
-                               DataSet<Tuple2<Long, Integer>> aggregateDs = 
ds.groupBy(1)
-                                               .aggregate(Aggregations.SUM, 0)
+
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               DataSet<Tuple3<Integer, Long, String>> ds = 
CollectionDataSets.get3TupleDataSet(env);
+               DataSet<Tuple2<Long, Integer>> aggregateDs = ds.groupBy(1)
+                               .aggregate(Aggregations.SUM, 0)
                                                .project(1, 0);
-                               
-                               aggregateDs.writeAsCsv(resultPath);
-                               env.execute();
-                               
-                               // return expected result
-                               return "1,1\n" +
+
+               aggregateDs.writeAsCsv(resultPath);
+               env.execute();
+
+               expected = "1,1\n" +
                                "2,5\n" +
                                "3,15\n" +
                                "4,34\n" +
                                "5,65\n" +
                                "6,111\n";
-                       } 
-                       case 3: {
-                               /*
-                                * Nested Aggregate
-                                */
-                               
-                               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-                               
-                               DataSet<Tuple3<Integer, Long, String>> ds = 
CollectionDataSets.get3TupleDataSet(env);
-                               DataSet<Tuple1<Integer>> aggregateDs = 
ds.groupBy(1)
-                                               .aggregate(Aggregations.MIN, 0)
-                                               .aggregate(Aggregations.MIN, 0)
+       }
+
+       @Test
+       public void testNestedAggregate() throws Exception {
+               /*
+                * Nested Aggregate
+                */
+
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               DataSet<Tuple3<Integer, Long, String>> ds = 
CollectionDataSets.get3TupleDataSet(env);
+               DataSet<Tuple1<Integer>> aggregateDs = ds.groupBy(1)
+                               .aggregate(Aggregations.MIN, 0)
+                               .aggregate(Aggregations.MIN, 0)
                                                .project(0);
-                               
-                               aggregateDs.writeAsCsv(resultPath);
-                               env.execute();
-                               
-                               // return expected result
-                               return "1\n";
-                       }
-                       default: 
-                               throw new IllegalArgumentException("Invalid 
program id");
-                       }
-               }
+
+               aggregateDs.writeAsCsv(resultPath);
+               env.execute();
+
+               expected = "1\n";
        }
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/a0f77857/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CoGroupITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CoGroupITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CoGroupITCase.java
index ffc208c..b249e22 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CoGroupITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CoGroupITCase.java
@@ -18,11 +18,8 @@
 
 package org.apache.flink.test.javaApiOperators;
 
-import java.io.FileNotFoundException;
-import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.LinkedList;
 import java.util.List;
 
 import org.apache.flink.api.common.functions.CoGroupFunction;
@@ -39,478 +36,486 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
 import 
org.apache.flink.test.javaApiOperators.util.CollectionDataSets.CustomType;
 import org.apache.flink.test.javaApiOperators.util.CollectionDataSets.POJO;
-import org.apache.flink.test.util.JavaProgramTestBase;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
 import org.apache.flink.util.Collector;
+import org.junit.After;
 import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
-import org.junit.runners.Parameterized.Parameters;
 
 @RunWith(Parameterized.class)
-public class CoGroupITCase extends JavaProgramTestBase {
-       
-       private static int NUM_PROGRAMS = 13;
-       
-       private int curProgId = config.getInteger("ProgramId", -1);
+public class CoGroupITCase extends MultipleProgramsTestBase {
+
+       public CoGroupITCase(ExecutionMode mode){
+               super(mode);
+       }
+
        private String resultPath;
-       private String expectedResult;
-       
-       public CoGroupITCase(Configuration config) {
-               super(config);
+       private String expected;
+
+       @Rule
+       public TemporaryFolder tempFolder = new TemporaryFolder();
+
+       @Before
+       public void before() throws Exception{
+               resultPath = tempFolder.newFile().toURI().toString();
        }
-       
-       @Override
-       protected void preSubmit() throws Exception {
-               resultPath = getTempDirPath("result");
+
+       @After
+       public void after() throws Exception{
+               compareResultsByLinesInMemory(expected, resultPath);
        }
 
-       @Override
-       protected void testProgram() throws Exception {
-               expectedResult = CoGroupProgs.runProgram(curProgId, resultPath);
+       @Test
+       public void testCoGroupTuplesWithKeyFieldSelector() throws Exception {
+               /*
+                                * CoGroup on tuples with key field selector
+                                */
+
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds = 
CollectionDataSets.get5TupleDataSet(env);
+               DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = 
CollectionDataSets.get5TupleDataSet(env);
+               DataSet<Tuple2<Integer, Integer>> coGroupDs = 
ds.coGroup(ds2).where(0).equalTo(0).with(new Tuple5CoGroup());
+
+               coGroupDs.writeAsCsv(resultPath);
+               env.execute();
+
+               expected = "1,0\n" +
+                               "2,6\n" +
+                               "3,24\n" +
+                               "4,60\n" +
+                               "5,120\n";
        }
-       
-       @Override
-       protected void postSubmit() throws Exception {
-               compareResultsByLinesInMemory(expectedResult, resultPath);
+
+       @Test
+       public void testCoGroupOnTwoCustomTypeInputsWithKeyExtractors() throws 
Exception {
+               /*
+                                * CoGroup on two custom type inputs with key 
extractors
+                                */
+
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               DataSet<CustomType> ds = 
CollectionDataSets.getCustomTypeDataSet(env);
+               DataSet<CustomType> ds2 = 
CollectionDataSets.getCustomTypeDataSet(env);
+               DataSet<CustomType> coGroupDs = ds.coGroup(ds2).where(new 
KeySelector4()).equalTo(new
+                               KeySelector5()).with(new CustomTypeCoGroup());
+
+               coGroupDs.writeAsText(resultPath);
+               env.execute();
+
+               expected = "1,0,test\n" +
+                               "2,6,test\n" +
+                               "3,24,test\n" +
+                               "4,60,test\n" +
+                               "5,120,test\n" +
+                               "6,210,test\n";
        }
-       
-       @Parameters
-       public static Collection<Object[]> getConfigurations() throws 
FileNotFoundException, IOException {
 
-               LinkedList<Configuration> tConfigs = new 
LinkedList<Configuration>();
+       public static class KeySelector4 implements KeySelector<CustomType, 
Integer> {
+               private static final long serialVersionUID = 1L;
+               @Override
+               public Integer getKey(CustomType in) {
+                       return in.myInt;
+               }
+       }
 
-               for(int i=1; i <= NUM_PROGRAMS; i++) {
-                       Configuration config = new Configuration();
-                       config.setInteger("ProgramId", i);
-                       tConfigs.add(config);
+       public static class KeySelector5 implements KeySelector<CustomType, 
Integer> {
+               private static final long serialVersionUID = 1L;
+               @Override
+               public Integer getKey(CustomType in) {
+                       return in.myInt;
                }
-               
-               return toParameterList(tConfigs);
        }
-       
-       private static class CoGroupProgs {
-               
-               public static String runProgram(int progId, String resultPath) 
throws Exception {
-                       
-                       switch(progId) {
-                       case 1: {
-                               
-                               /*
-                                * CoGroup on tuples with key field selector
-                                */
-                               
-                               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-                               
-                               DataSet<Tuple5<Integer, Long, Integer, String, 
Long>> ds = CollectionDataSets.get5TupleDataSet(env);
-                               DataSet<Tuple5<Integer, Long, Integer, String, 
Long>> ds2 = CollectionDataSets.get5TupleDataSet(env);
-                               DataSet<Tuple2<Integer, Integer>> coGroupDs = 
ds.coGroup(ds2).where(0).equalTo(0).with(new Tuple5CoGroup());
-                               
-                               coGroupDs.writeAsCsv(resultPath);
-                               env.execute();
-                               
-                               // return expected result
-                               return "1,0\n" +
-                                               "2,6\n" +
-                                               "3,24\n" +
-                                               "4,60\n" +
-                                               "5,120\n";
-                       }
-                       case 2: {
-                               
-                               /*
-                                * CoGroup on two custom type inputs with key 
extractors
-                                */
-                               
-                               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-                               
-                               DataSet<CustomType> ds = 
CollectionDataSets.getCustomTypeDataSet(env);
-                               DataSet<CustomType> ds2 = 
CollectionDataSets.getCustomTypeDataSet(env);
-                               DataSet<CustomType> coGroupDs = 
ds.coGroup(ds2).where(new KeySelector<CustomType, Integer>() {
-                                                                       private 
static final long serialVersionUID = 1L;
-                                                                       
@Override
-                                                                       public 
Integer getKey(CustomType in) {
-                                                                               
return in.myInt;
-                                                                       }
-                                                               }).equalTo(new 
KeySelector<CustomType, Integer>() {
-                                                                       private 
static final long serialVersionUID = 1L;
-                                                                       
@Override
-                                                                       public 
Integer getKey(CustomType in) {
-                                                                               
return in.myInt;
-                                                                       }
-                                                               }).with(new 
CustomTypeCoGroup());
-                               
-                               coGroupDs.writeAsText(resultPath);
-                               env.execute();
-                               
-                               // return expected result
-                               return "1,0,test\n" +
-                                               "2,6,test\n" +
-                                               "3,24,test\n" +
-                                               "4,60,test\n" +
-                                               "5,120,test\n" +
-                                               "6,210,test\n";
-                       }
-                       case 3: {
-                               
-                               /*
-                                * check correctness of cogroup if UDF returns 
left input objects
-                                */
-                               
-                               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-                               
-                               DataSet<Tuple3<Integer, Long, String>> ds = 
CollectionDataSets.get3TupleDataSet(env);
-                               DataSet<Tuple3<Integer, Long, String>> ds2 = 
CollectionDataSets.get3TupleDataSet(env);
-                               DataSet<Tuple3<Integer, Long, String>> 
coGroupDs = ds.coGroup(ds2).where(0).equalTo(0).with(new Tuple3ReturnLeft());
-                               
-                               coGroupDs.writeAsCsv(resultPath);
-                               env.execute();
-                               
-                               // return expected result
-                               return "1,1,Hi\n" +
-                                               "2,2,Hello\n" +
-                                               "3,2,Hello world\n" +
-                                               "4,3,Hello world, how are 
you?\n" +
-                                               "5,3,I am fine.\n";
-                               
-                       }
-                       case 4: {
-                               
-                               /*
-                                * check correctness of cogroup if UDF returns 
right input objects
-                                */
-                               
-                               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-                               
-                               DataSet<Tuple5<Integer, Long, Integer, String, 
Long>> ds = CollectionDataSets.get5TupleDataSet(env);
-                               DataSet<Tuple5<Integer, Long, Integer, String, 
Long>> ds2 = CollectionDataSets.get5TupleDataSet(env);
-                               DataSet<Tuple5<Integer, Long, Integer, String, 
Long>> coGroupDs = ds.coGroup(ds2).where(0).equalTo(0).with(new 
Tuple5ReturnRight());
-                               
-                               coGroupDs.writeAsCsv(resultPath);
-                               env.execute();
-                               
-                               // return expected result
-                               return "1,1,0,Hallo,1\n" +
-                                               "2,2,1,Hallo Welt,2\n" +
-                                               "2,3,2,Hallo Welt wie,1\n" +
-                                               "3,4,3,Hallo Welt wie 
gehts?,2\n" +
-                                               "3,5,4,ABC,2\n" +
-                                               "3,6,5,BCD,3\n";
-                               
-                       }
-                       case 5: {
-                               
-                               /*
-                                * Reduce with broadcast set
-                                */
-                               
-                               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-                               
-                               DataSet<Integer> intDs = 
CollectionDataSets.getIntegerDataSet(env);
-                               
-                               DataSet<Tuple5<Integer, Long, Integer, String, 
Long>> ds = CollectionDataSets.get5TupleDataSet(env);
-                               DataSet<Tuple5<Integer, Long, Integer, String, 
Long>> ds2 = CollectionDataSets.get5TupleDataSet(env);
-                               DataSet<Tuple3<Integer, Integer, Integer>> 
coGroupDs = ds.coGroup(ds2).where(0).equalTo(0).with(new 
Tuple5CoGroupBC()).withBroadcastSet(intDs, "ints");
-                               
-                               coGroupDs.writeAsCsv(resultPath);
-                               env.execute();
-                               
-                               // return expected result
-                               return "1,0,55\n" +
-                                               "2,6,55\n" +
-                                               "3,24,55\n" +
-                                               "4,60,55\n" +
-                                               "5,120,55\n";
-                       }
-                       case 6: {
-                               
-                               /*
-                                * CoGroup on a tuple input with key field 
selector and a custom type input with key extractor
-                                */
-                               
-                               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-                               
-                               DataSet<Tuple5<Integer, Long, Integer, String, 
Long>> ds = CollectionDataSets.get5TupleDataSet(env);
-                               DataSet<CustomType> ds2 = 
CollectionDataSets.getCustomTypeDataSet(env);
-                               DataSet<Tuple3<Integer, Long, String>> 
coGroupDs = ds.coGroup(ds2).where(2).equalTo(new KeySelector<CustomType, 
Integer>() {
-                                                                       private 
static final long serialVersionUID = 1L;
-                                                                       
@Override
-                                                                       public 
Integer getKey(CustomType in) {
-                                                                               
return in.myInt;
-                                                                       }
-                                                               }).with(new 
MixedCoGroup());
-                               
-                               coGroupDs.writeAsCsv(resultPath);
-                               env.execute();
-                               
-                               // return expected result
-                               return "0,1,test\n" +
-                                               "1,2,test\n" +
-                                               "2,5,test\n" +
-                                               "3,15,test\n" +
-                                               "4,33,test\n" +
-                                               "5,63,test\n" +
-                                               "6,109,test\n" +
-                                               "7,4,test\n" + 
-                                               "8,4,test\n" + 
-                                               "9,4,test\n" + 
-                                               "10,5,test\n" + 
-                                               "11,5,test\n" + 
-                                               "12,5,test\n" + 
-                                               "13,5,test\n" +
-                                               "14,5,test\n"; 
-                                               
-                       }
-                       case 7: {
-                               
-                               /*
-                                * CoGroup on a tuple input with key field 
selector and a custom type input with key extractor
-                                */
-                               
-                               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-                               
-                               DataSet<Tuple5<Integer, Long, Integer, String, 
Long>> ds = CollectionDataSets.get5TupleDataSet(env);
-                               DataSet<CustomType> ds2 = 
CollectionDataSets.getCustomTypeDataSet(env);
-                               DataSet<CustomType> coGroupDs = 
ds2.coGroup(ds).where(new KeySelector<CustomType, Integer>() {
-                                                                       private 
static final long serialVersionUID = 1L;
-                                                                       
@Override
-                                                                       public 
Integer getKey(CustomType in) {
-                                                                               
return in.myInt;
-                                                                       }
-                                                               
}).equalTo(2).with(new MixedCoGroup2());
-                               
-                               coGroupDs.writeAsText(resultPath);
-                               env.execute();
-                               
-                               // return expected result
-                               return "0,1,test\n" +
-                                               "1,2,test\n" +
-                                               "2,5,test\n" +
-                                               "3,15,test\n" +
-                                               "4,33,test\n" +
-                                               "5,63,test\n" +
-                                               "6,109,test\n" +
-                                               "7,4,test\n" + 
-                                               "8,4,test\n" + 
-                                               "9,4,test\n" + 
-                                               "10,5,test\n" + 
-                                               "11,5,test\n" + 
-                                               "12,5,test\n" + 
-                                               "13,5,test\n" +
-                                               "14,5,test\n"; 
-                               
-                       }
-                       case 8: {
-                               /*
-                                * CoGroup with multiple key fields
-                                */
-                               
-                               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-                               
-                               DataSet<Tuple5<Integer, Long, Integer, String, 
Long>> ds1 = CollectionDataSets.get5TupleDataSet(env);
-                               DataSet<Tuple3<Integer, Long, String>> ds2 = 
CollectionDataSets.get3TupleDataSet(env);
-                               
-                               DataSet<Tuple3<Integer, Long, String>> 
coGrouped = ds1.coGroup(ds2).
-                                               
where(0,4).equalTo(0,1).with(new Tuple5Tuple3CoGroup());
-                               
-                               coGrouped.writeAsCsv(resultPath);
-                               env.execute();
-                               
-                               return "1,1,Hallo\n" +
-                                               "2,2,Hallo Welt\n" +
-                                               "3,2,Hallo Welt wie gehts?\n" +
-                                               "3,2,ABC\n" +
-                                               "5,3,HIJ\n" +
-                                               "5,3,IJK\n";
-                       }
-                       case 9: {
-                               /*
-                                * CoGroup with multiple key fields
-                                */
-                               
-                               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-                               
-                               DataSet<Tuple5<Integer, Long, Integer, String, 
Long>> ds1 = CollectionDataSets.get5TupleDataSet(env);
-                               DataSet<Tuple3<Integer, Long, String>> ds2 = 
CollectionDataSets.get3TupleDataSet(env);
-                               
-                               DataSet<Tuple3<Integer, Long, String>> 
coGrouped = ds1.coGroup(ds2).
-                                               where(new 
KeySelector<Tuple5<Integer,Long,Integer,String,Long>, Tuple2<Integer, Long>>() {
-                                                       private static final 
long serialVersionUID = 1L;
-                                                       
-                                                       @Override
-                                                       public Tuple2<Integer, 
Long> getKey(Tuple5<Integer,Long,Integer,String,Long> t) {
-                                                               return new 
Tuple2<Integer, Long>(t.f0, t.f4);
-                                                       }
-                                               }).
-                                               equalTo(new 
KeySelector<Tuple3<Integer,Long,String>, Tuple2<Integer, Long>>() {
-                                                       private static final 
long serialVersionUID = 1L;
-                                                       
-                                                       @Override
-                                                       public Tuple2<Integer, 
Long> getKey(Tuple3<Integer,Long,String> t) {
-                                                               return new 
Tuple2<Integer, Long>(t.f0, t.f1);
-                                                       }
-                                               }).with(new 
Tuple5Tuple3CoGroup());
-                               
-                               coGrouped.writeAsCsv(resultPath);
-                               env.execute();
-                               
-                               return "1,1,Hallo\n" +
-                                               "2,2,Hallo Welt\n" +
-                                               "3,2,Hallo Welt wie gehts?\n" +
-                                               "3,2,ABC\n" +
-                                               "5,3,HIJ\n" +
-                                               "5,3,IJK\n";
-                       }
-                       case 10: {
-                               /*
-                                * CoGroup on two custom type inputs using 
expression keys
-                                */
-                               
-                               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-                               
-                               DataSet<CustomType> ds = 
CollectionDataSets.getCustomTypeDataSet(env);
-                               DataSet<CustomType> ds2 = 
CollectionDataSets.getCustomTypeDataSet(env);
-                               DataSet<CustomType> coGroupDs = 
ds.coGroup(ds2).where("myInt").equalTo("myInt").with(new CustomTypeCoGroup());
-                               
-                               coGroupDs.writeAsText(resultPath);
-                               env.execute();
-                               
-                               // return expected result
-                               return "1,0,test\n" +
-                                               "2,6,test\n" +
-                                               "3,24,test\n" +
-                                               "4,60,test\n" +
-                                               "5,120,test\n" +
-                                               "6,210,test\n";
-                       }
-                       case 11: {
-                               /*
-                                * CoGroup on two custom type inputs using 
expression keys
-                                */
-                               
-                               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-                               
-                               DataSet<POJO> ds = 
CollectionDataSets.getSmallPojoDataSet(env);
-                               DataSet<Tuple7<Integer, String, Integer, 
Integer, Long, String, Long>> ds2 = 
CollectionDataSets.getSmallTuplebasedDataSet(env);
-                               DataSet<CustomType> coGroupDs = ds.coGroup(ds2)
-                                               
.where("nestedPojo.longNumber").equalTo(6).with(new CoGroupFunction<POJO, 
Tuple7<Integer, String, Integer, Integer, Long, String, Long>, CustomType>() {
-                                               private static final long 
serialVersionUID = 1L;
-
-                                               @Override
-                                               public void coGroup(
-                                                               Iterable<POJO> 
first,
-                                                               
Iterable<Tuple7<Integer, String, Integer, Integer, Long, String, Long>> second,
-                                                               
Collector<CustomType> out) throws Exception {
-                                                       for(POJO p : first) {
-                                                               
for(Tuple7<Integer, String, Integer, Integer, Long, String, Long> t: second) {
-                                                                       
Assert.assertTrue(p.nestedPojo.longNumber == t.f6);
-                                                                       
out.collect(new CustomType(-1, p.nestedPojo.longNumber, "Flink"));
-                                                               }
-                                                       }
-                                               }
-                               });
-                               coGroupDs.writeAsText(resultPath);
-                               env.execute();
-                               
-                               // return expected result
-                               return  "-1,20000,Flink\n" +
-                                               "-1,10000,Flink\n" +
-                                               "-1,30000,Flink\n";
-                       }
-                       case 12: {
-                               /*
-                                * CoGroup field-selector (expression keys) + 
key selector function
-                                * The key selector is unnecessary complicated 
(Tuple1) ;)
-                                */
-                               
-                               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-                               
-                               DataSet<POJO> ds = 
CollectionDataSets.getSmallPojoDataSet(env);
-                               DataSet<Tuple7<Integer, String, Integer, 
Integer, Long, String, Long>> ds2 = 
CollectionDataSets.getSmallTuplebasedDataSet(env);
-                               DataSet<CustomType> coGroupDs = ds.coGroup(ds2)
-                                               .where(new KeySelector<POJO, 
Tuple1<Long>>() {
-                                                       private static final 
long serialVersionUID = 1L;
-
-                                                       @Override
-                                                       public Tuple1<Long> 
getKey(POJO value)
-                                                                       throws 
Exception {
-                                                               return new 
Tuple1<Long>(value.nestedPojo.longNumber);
-                                                       }
-                                               }).equalTo(6).with(new 
CoGroupFunction<POJO, Tuple7<Integer, String, Integer, Integer, Long, String, 
Long>, CustomType>() {
-                                                       private static final 
long serialVersionUID = 1L;
-
-                                               @Override
-                                               public void coGroup(
-                                                               Iterable<POJO> 
first,
-                                                               
Iterable<Tuple7<Integer, String, Integer, Integer, Long, String, Long>> second,
-                                                               
Collector<CustomType> out) throws Exception {
-                                                       for(POJO p : first) {
-                                                               
for(Tuple7<Integer, String, Integer, Integer, Long, String, Long> t: second) {
-                                                                       
Assert.assertTrue(p.nestedPojo.longNumber == t.f6);
-                                                                       
out.collect(new CustomType(-1, p.nestedPojo.longNumber, "Flink"));
-                                                               }
-                                                       }
-                                               }
-                               });
-                               coGroupDs.writeAsText(resultPath);
-                               env.execute();
-                               
-                               // return expected result
-                               return  "-1,20000,Flink\n" +
-                                               "-1,10000,Flink\n" +
-                                               "-1,30000,Flink\n";
+
+       @Test
+       public void testCorrectnessOfCoGroupIfUDFReturnsLeftInputObjects() 
throws Exception {
+               /*
+                * check correctness of cogroup if UDF returns left input 
objects
+                */
+
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               DataSet<Tuple3<Integer, Long, String>> ds = 
CollectionDataSets.get3TupleDataSet(env);
+               DataSet<Tuple3<Integer, Long, String>> ds2 = 
CollectionDataSets.get3TupleDataSet(env);
+               DataSet<Tuple3<Integer, Long, String>> coGroupDs = 
ds.coGroup(ds2).where(0).equalTo(0).with(new Tuple3ReturnLeft());
+
+               coGroupDs.writeAsCsv(resultPath);
+               env.execute();
+
+               expected = "1,1,Hi\n" +
+                               "2,2,Hello\n" +
+                               "3,2,Hello world\n" +
+                               "4,3,Hello world, how are you?\n" +
+                               "5,3,I am fine.\n";
+       }
+
+       @Test
+       public void testCorrectnessOfCoGroupIfUDFReturnsRightInputObjects() 
throws Exception {
+               /*
+                * check correctness of cogroup if UDF returns right input 
objects
+                */
+
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds = 
CollectionDataSets.get5TupleDataSet(env);
+               DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = 
CollectionDataSets.get5TupleDataSet(env);
+               DataSet<Tuple5<Integer, Long, Integer, String, Long>> coGroupDs 
= ds.coGroup(ds2).where(0).equalTo(0).with(new Tuple5ReturnRight());
+
+               coGroupDs.writeAsCsv(resultPath);
+               env.execute();
+
+               expected = "1,1,0,Hallo,1\n" +
+                               "2,2,1,Hallo Welt,2\n" +
+                               "2,3,2,Hallo Welt wie,1\n" +
+                               "3,4,3,Hallo Welt wie gehts?,2\n" +
+                               "3,5,4,ABC,2\n" +
+                               "3,6,5,BCD,3\n";
+       }
+
+       @Test
+       public void testCoGroupWithBroadcastSet() throws Exception {
+               /*
+                * Reduce with broadcast set
+                */
+
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               DataSet<Integer> intDs = 
CollectionDataSets.getIntegerDataSet(env);
+
+               DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds = 
CollectionDataSets.get5TupleDataSet(env);
+               DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = 
CollectionDataSets.get5TupleDataSet(env);
+               DataSet<Tuple3<Integer, Integer, Integer>> coGroupDs = 
ds.coGroup(ds2).where(0).equalTo(0).with(new 
Tuple5CoGroupBC()).withBroadcastSet(intDs, "ints");
+
+               coGroupDs.writeAsCsv(resultPath);
+               env.execute();
+
+               expected = "1,0,55\n" +
+                               "2,6,55\n" +
+                               "3,24,55\n" +
+                               "4,60,55\n" +
+                               "5,120,55\n";
+       }
+
+       @Test
+       public void 
testCoGroupOnATupleInputWithKeyFieldSelectorAndACustomTypeInputWithKeyExtractor()
+       throws Exception {
+               /*
+                * CoGroup on a tuple input with key field selector and a 
custom type input with key extractor
+                */
+
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds = 
CollectionDataSets.get5TupleDataSet(env);
+               DataSet<CustomType> ds2 = 
CollectionDataSets.getCustomTypeDataSet(env);
+               DataSet<Tuple3<Integer, Long, String>> coGroupDs = 
ds.coGroup(ds2).where(2).equalTo(new
+                               KeySelector2()).with(new MixedCoGroup());
+
+               coGroupDs.writeAsCsv(resultPath);
+               env.execute();
+
+               expected = "0,1,test\n" +
+                               "1,2,test\n" +
+                               "2,5,test\n" +
+                               "3,15,test\n" +
+                               "4,33,test\n" +
+                               "5,63,test\n" +
+                               "6,109,test\n" +
+                               "7,4,test\n" +
+                               "8,4,test\n" +
+                               "9,4,test\n" +
+                               "10,5,test\n" +
+                               "11,5,test\n" +
+                               "12,5,test\n" +
+                               "13,5,test\n" +
+                               "14,5,test\n";
+       }
+
+       public static class KeySelector2 implements KeySelector<CustomType, 
Integer> {
+               private static final long serialVersionUID = 1L;
+               @Override
+               public Integer getKey(CustomType in) {
+                       return in.myInt;
+               }
+       }
+
+       @Test
+       public void 
testCoGroupOnACustomTypeWithKeyExtractorAndATupleInputWithKeyFieldSelector()
+                       throws Exception {
+               /*
+                * CoGroup on a tuple input with key field selector and a 
custom type input with key extractor
+                */
+
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds = 
CollectionDataSets.get5TupleDataSet(env);
+               DataSet<CustomType> ds2 = 
CollectionDataSets.getCustomTypeDataSet(env);
+               DataSet<CustomType> coGroupDs = ds2.coGroup(ds).where(new 
KeySelector3()).equalTo(2).with
+                               (new MixedCoGroup2());
+
+               coGroupDs.writeAsText(resultPath);
+               env.execute();
+
+               expected = "0,1,test\n" +
+                               "1,2,test\n" +
+                               "2,5,test\n" +
+                               "3,15,test\n" +
+                               "4,33,test\n" +
+                               "5,63,test\n" +
+                               "6,109,test\n" +
+                               "7,4,test\n" +
+                               "8,4,test\n" +
+                               "9,4,test\n" +
+                               "10,5,test\n" +
+                               "11,5,test\n" +
+                               "12,5,test\n" +
+                               "13,5,test\n" +
+                               "14,5,test\n";
+
+       }
+
+       public static class KeySelector3 implements KeySelector<CustomType, 
Integer> {
+               private static final long serialVersionUID = 1L;
+               @Override
+               public Integer getKey(CustomType in) {
+                       return in.myInt;
+               }
+       }
+
+       @Test
+       public void testCoGroupWithMultipleKeyFieldsWithFieldSelector() throws 
Exception {
+               /*
+                * CoGroup with multiple key fields
+                */
+
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds1 = 
CollectionDataSets.get5TupleDataSet(env);
+               DataSet<Tuple3<Integer, Long, String>> ds2 = 
CollectionDataSets.get3TupleDataSet(env);
+
+               DataSet<Tuple3<Integer, Long, String>> coGrouped = 
ds1.coGroup(ds2).
+                               where(0,4).equalTo(0,1).with(new 
Tuple5Tuple3CoGroup());
+
+               coGrouped.writeAsCsv(resultPath);
+               env.execute();
+
+               expected = "1,1,Hallo\n" +
+                               "2,2,Hallo Welt\n" +
+                               "3,2,Hallo Welt wie gehts?\n" +
+                               "3,2,ABC\n" +
+                               "5,3,HIJ\n" +
+                               "5,3,IJK\n";
+       }
+
+       @Test
+       public void testCoGroupWithMultipleKeyFieldsWithKeyExtractor() throws 
Exception {
+               /*
+                * CoGroup with multiple key fields
+                */
+
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds1 = 
CollectionDataSets.get5TupleDataSet(env);
+               DataSet<Tuple3<Integer, Long, String>> ds2 = 
CollectionDataSets.get3TupleDataSet(env);
+
+               DataSet<Tuple3<Integer, Long, String>> coGrouped = 
ds1.coGroup(ds2).
+                               where(new KeySelector7()).
+                               equalTo(new KeySelector8()).with(new 
Tuple5Tuple3CoGroup());
+
+               coGrouped.writeAsCsv(resultPath);
+               env.execute();
+
+               expected = "1,1,Hallo\n" +
+                               "2,2,Hallo Welt\n" +
+                               "3,2,Hallo Welt wie gehts?\n" +
+                               "3,2,ABC\n" +
+                               "5,3,HIJ\n" +
+                               "5,3,IJK\n";
+       }
+
+       public static class KeySelector7 implements 
KeySelector<Tuple5<Integer,Long,Integer,String,Long>,
+       Tuple2<Integer, Long>> {
+               private static final long serialVersionUID = 1L;
+
+               @Override
+               public Tuple2<Integer, Long> 
getKey(Tuple5<Integer,Long,Integer,String,Long> t) {
+                       return new Tuple2<Integer, Long>(t.f0, t.f4);
+               }
+       }
+
+       public static class KeySelector8 implements 
KeySelector<Tuple3<Integer,Long,String>, Tuple2<Integer, Long>> {
+               private static final long serialVersionUID = 1L;
+
+               @Override
+               public Tuple2<Integer, Long> getKey(Tuple3<Integer,Long,String> 
t) {
+                       return new Tuple2<Integer, Long>(t.f0, t.f1);
+               }
+       }
+
+       @Test
+       public void testCoGroupTwoCustomTypeInputsWithExpressionKeys() throws 
Exception {
+               /*
+                * CoGroup on two custom type inputs using expression keys
+                */
+
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               DataSet<CustomType> ds = 
CollectionDataSets.getCustomTypeDataSet(env);
+               DataSet<CustomType> ds2 = 
CollectionDataSets.getCustomTypeDataSet(env);
+               DataSet<CustomType> coGroupDs = 
ds.coGroup(ds2).where("myInt").equalTo("myInt").with(new CustomTypeCoGroup());
+
+               coGroupDs.writeAsText(resultPath);
+               env.execute();
+
+               expected = "1,0,test\n" +
+                               "2,6,test\n" +
+                               "3,24,test\n" +
+                               "4,60,test\n" +
+                               "5,120,test\n" +
+                               "6,210,test\n";
+       }
+
+       @Test
+       public void 
testCoGroupOnTwoCustomTypeInputsWithExpressionKeyAndFieldSelector() throws
+                       Exception {
+               /*
+                * CoGroup on two custom type inputs using expression keys
+                */
+
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               DataSet<POJO> ds = CollectionDataSets.getSmallPojoDataSet(env);
+               DataSet<Tuple7<Integer, String, Integer, Integer, Long, String, 
Long>> ds2 = CollectionDataSets.getSmallTuplebasedDataSet(env);
+               DataSet<CustomType> coGroupDs = ds.coGroup(ds2)
+                               
.where("nestedPojo.longNumber").equalTo(6).with(new CoGroup1());
+               coGroupDs.writeAsText(resultPath);
+               env.execute();
+
+               expected =      "-1,20000,Flink\n" +
+                               "-1,10000,Flink\n" +
+                               "-1,30000,Flink\n";
+       }
+
+       public static class CoGroup1 implements CoGroupFunction<POJO, 
Tuple7<Integer, String, Integer, Integer, Long, String, Long>, CustomType> {
+               private static final long serialVersionUID = 1L;
+
+               @Override
+               public void coGroup(
+                               Iterable<POJO> first,
+                               Iterable<Tuple7<Integer, String, Integer, 
Integer, Long, String, Long>> second,
+                               Collector<CustomType> out) throws Exception {
+                       for(POJO p : first) {
+                               for(Tuple7<Integer, String, Integer, Integer, 
Long, String, Long> t: second) {
+                                       
Assert.assertTrue(p.nestedPojo.longNumber == t.f6);
+                                       out.collect(new CustomType(-1, 
p.nestedPojo.longNumber, "Flink"));
+                               }
                        }
-                       case 13: {
-                               /*
-                                * CoGroup field-selector (expression keys) + 
key selector function
-                                * The key selector is simple here
-                                */
-                               
-                               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-                               
-                               DataSet<POJO> ds = 
CollectionDataSets.getSmallPojoDataSet(env);
-                               DataSet<Tuple7<Integer, String, Integer, 
Integer, Long, String, Long>> ds2 = 
CollectionDataSets.getSmallTuplebasedDataSet(env);
-                               DataSet<CustomType> coGroupDs = ds.coGroup(ds2)
-                                               .where(new KeySelector<POJO, 
Long>() {
-                                                       private static final 
long serialVersionUID = 1L;
-
-                                                       @Override
-                                                       public Long getKey(POJO 
value)
-                                                                       throws 
Exception {
-                                                               return 
value.nestedPojo.longNumber;
-                                                       }
-                                               }).equalTo(6).with(new 
CoGroupFunction<POJO, Tuple7<Integer, String, Integer, Integer, Long, String, 
Long>, CustomType>() {
-                                                       private static final 
long serialVersionUID = 1L;
-
-                                               @Override
-                                               public void coGroup(
-                                                               Iterable<POJO> 
first,
-                                                               
Iterable<Tuple7<Integer, String, Integer, Integer, Long, String, Long>> second,
-                                                               
Collector<CustomType> out) throws Exception {
-                                                       for(POJO p : first) {
-                                                               
for(Tuple7<Integer, String, Integer, Integer, Long, String, Long> t: second) {
-                                                                       
Assert.assertTrue(p.nestedPojo.longNumber == t.f6);
-                                                                       
out.collect(new CustomType(-1, p.nestedPojo.longNumber, "Flink"));
-                                                               }
-                                                       }
-                                               }
-                               });
-                               coGroupDs.writeAsText(resultPath);
-                               env.execute();
-                               
-                               // return expected result
-                               return  "-1,20000,Flink\n" +
-                                               "-1,10000,Flink\n" +
-                                               "-1,30000,Flink\n";
+               }
+       }
+
+       @Test
+       public void testCoGroupFieldSelectorAndComplicatedKeySelector() throws 
Exception {
+               /*
+                * CoGroup field-selector (expression keys) + key selector 
function
+                * The key selector is unnecessary complicated (Tuple1) ;)
+                */
+
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               DataSet<POJO> ds = CollectionDataSets.getSmallPojoDataSet(env);
+               DataSet<Tuple7<Integer, String, Integer, Integer, Long, String, 
Long>> ds2 = CollectionDataSets.getSmallTuplebasedDataSet(env);
+               DataSet<CustomType> coGroupDs = ds.coGroup(ds2)
+                               .where(new KeySelector6()).equalTo(6).with(new 
CoGroup3());
+               coGroupDs.writeAsText(resultPath);
+               env.execute();
+
+               expected =      "-1,20000,Flink\n" +
+                               "-1,10000,Flink\n" +
+                               "-1,30000,Flink\n";
+
+       }
+
+       public static class KeySelector6 implements KeySelector<POJO, 
Tuple1<Long>> {
+               private static final long serialVersionUID = 1L;
+
+               @Override
+               public Tuple1<Long> getKey(POJO value)
+               throws Exception {
+                       return new Tuple1<Long>(value.nestedPojo.longNumber);
+               }
+       }
+
+       public static class CoGroup3 implements CoGroupFunction<POJO, 
Tuple7<Integer,
+                       String, Integer, Integer, Long, String, Long>, 
CustomType> {
+               private static final long serialVersionUID = 1L;
+
+               @Override
+               public void coGroup(
+                               Iterable<POJO> first,
+                               Iterable<Tuple7<Integer, String, Integer, 
Integer, Long, String, Long>> second,
+                               Collector<CustomType> out) throws Exception {
+                       for(POJO p : first) {
+                               for(Tuple7<Integer, String, Integer, Integer, 
Long, String, Long> t: second) {
+                                       
Assert.assertTrue(p.nestedPojo.longNumber == t.f6);
+                                       out.collect(new CustomType(-1, 
p.nestedPojo.longNumber, "Flink"));
+                               }
                        }
-                       
-                       default: 
-                               throw new IllegalArgumentException("Invalid 
program id");
+               }
+       }
+
+       @Test
+       public void testCoGroupFieldSelectorAndKeySelector() throws Exception {
+               /*
+                * CoGroup field-selector (expression keys) + key selector 
function
+                * The key selector is simple here
+                */
+
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               DataSet<POJO> ds = CollectionDataSets.getSmallPojoDataSet(env);
+               DataSet<Tuple7<Integer, String, Integer, Integer, Long, String, 
Long>> ds2 = CollectionDataSets.getSmallTuplebasedDataSet(env);
+               DataSet<CustomType> coGroupDs = ds.coGroup(ds2)
+                               .where(new KeySelector1()).equalTo(6).with(new 
CoGroup2());
+               coGroupDs.writeAsText(resultPath);
+               env.execute();
+
+               expected = "-1,20000,Flink\n" +
+                               "-1,10000,Flink\n" +
+                               "-1,30000,Flink\n";
+       }
+
+       public static class KeySelector1 implements KeySelector<POJO, Long> {
+               private static final long serialVersionUID = 1L;
+
+               @Override
+               public Long getKey(POJO value)
+               throws Exception {
+                       return value.nestedPojo.longNumber;
+               }
+       }
+
+       public static class CoGroup2 implements CoGroupFunction<POJO, 
Tuple7<Integer, String,
+                       Integer, Integer, Long, String, Long>, CustomType> {
+               private static final long serialVersionUID = 1L;
+
+               @Override
+               public void coGroup(
+                               Iterable<POJO> first,
+                               Iterable<Tuple7<Integer, String, Integer, 
Integer, Long, String, Long>> second,
+                               Collector<CustomType> out) throws Exception {
+                       for(POJO p : first) {
+                               for(Tuple7<Integer, String, Integer, Integer, 
Long, String, Long> t: second) {
+                                       
Assert.assertTrue(p.nestedPojo.longNumber == t.f6);
+                                       out.collect(new CustomType(-1, 
p.nestedPojo.longNumber, "Flink"));
+                               }
                        }
-                       
                }
-       
        }
-       
+
        public static class Tuple5CoGroup implements 
CoGroupFunction<Tuple5<Integer, Long, Integer, String, Long>, Tuple5<Integer, 
Long, Integer, String, Long>, Tuple2<Integer, Integer>> {
 
                private static final long serialVersionUID = 1L;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/a0f77857/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CrossITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CrossITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CrossITCase.java
index 7d79ea5..bd32bfc 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CrossITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CrossITCase.java
@@ -18,10 +18,7 @@
 
 package org.apache.flink.test.javaApiOperators;
 
-import java.io.FileNotFoundException;
-import java.io.IOException;
 import java.util.Collection;
-import java.util.LinkedList;
 
 import org.apache.flink.api.common.functions.CrossFunction;
 import org.apache.flink.api.common.functions.RichCrossFunction;
@@ -32,371 +29,336 @@ import org.apache.flink.api.java.tuple.Tuple6;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
 import 
org.apache.flink.test.javaApiOperators.util.CollectionDataSets.CustomType;
-import org.apache.flink.test.util.JavaProgramTestBase;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
-import org.junit.runners.Parameterized.Parameters;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 
 @RunWith(Parameterized.class)
-public class CrossITCase extends JavaProgramTestBase {
-       
-       private static int NUM_PROGRAMS = 11;
-       
-       private int curProgId = config.getInteger("ProgramId", -1);
+public class CrossITCase extends MultipleProgramsTestBase {
+
+       public CrossITCase(ExecutionMode mode){
+               super(mode);
+       }
+
        private String resultPath;
-       private String expectedResult;
-       
-       public CrossITCase(Configuration config) {
-               super(config);
+       private String expected;
+
+       @Rule
+       public TemporaryFolder tempFolder = new TemporaryFolder();
+
+       @Before
+       public void before() throws Exception{
+               resultPath = tempFolder.newFile().toURI().toString();
        }
-       
-       @Override
-       protected void preSubmit() throws Exception {
-               resultPath = getTempDirPath("result");
+
+       @After
+       public void after() throws Exception{
+               compareResultsByLinesInMemory(expected, resultPath);
        }
 
-       @Override
-       protected void testProgram() throws Exception {
-               expectedResult = CrossProgs.runProgram(curProgId, resultPath);
+       @Test
+       public void testCorretnessOfCrossOnTwoTupleInputs() throws Exception {
+               /*
+                * check correctness of cross on two tuple inputs
+                */
+
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds = 
CollectionDataSets.getSmall5TupleDataSet(env);
+               DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = 
CollectionDataSets.getSmall5TupleDataSet(env);
+               DataSet<Tuple2<Integer, String>> crossDs = 
ds.cross(ds2).with(new Tuple5Cross());
+
+               crossDs.writeAsCsv(resultPath);
+               env.execute();
+
+               expected = "0,HalloHallo\n" +
+                               "1,HalloHallo Welt\n" +
+                               "2,HalloHallo Welt wie\n" +
+                               "1,Hallo WeltHallo\n" +
+                               "2,Hallo WeltHallo Welt\n" +
+                               "3,Hallo WeltHallo Welt wie\n" +
+                               "2,Hallo Welt wieHallo\n" +
+                               "3,Hallo Welt wieHallo Welt\n" +
+                               "4,Hallo Welt wieHallo Welt wie\n";
        }
-       
-       @Override
-       protected void postSubmit() throws Exception {
-               compareResultsByLinesInMemory(expectedResult, resultPath);
+
+       @Test
+       public void testCorrectnessOfCrossIfUDFReturnsLeftInputObject() throws 
Exception {
+               /*
+                * check correctness of cross if UDF returns left input object
+                */
+
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               DataSet<Tuple3<Integer, Long, String>> ds = 
CollectionDataSets.getSmall3TupleDataSet(env);
+               DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = 
CollectionDataSets.getSmall5TupleDataSet(env);
+               DataSet<Tuple3<Integer, Long, String>> crossDs = 
ds.cross(ds2).with(new Tuple3ReturnLeft());
+
+               crossDs.writeAsCsv(resultPath);
+               env.execute();
+
+               expected = "1,1,Hi\n" +
+                               "1,1,Hi\n" +
+                               "1,1,Hi\n" +
+                               "2,2,Hello\n" +
+                               "2,2,Hello\n" +
+                               "2,2,Hello\n" +
+                               "3,2,Hello world\n" +
+                               "3,2,Hello world\n" +
+                               "3,2,Hello world\n";
        }
-       
-       @Parameters
-       public static Collection<Object[]> getConfigurations() throws 
FileNotFoundException, IOException {
 
-               LinkedList<Configuration> tConfigs = new 
LinkedList<Configuration>();
+       @Test
+       public void testCorrectnessOfCrossIfUDFReturnsRightInputObject() throws 
Exception {
+               /*
+                * check correctness of cross if UDF returns right input object
+                */
+
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               DataSet<Tuple3<Integer, Long, String>> ds = 
CollectionDataSets.getSmall3TupleDataSet(env);
+               DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = 
CollectionDataSets.getSmall5TupleDataSet(env);
+               DataSet<Tuple5<Integer, Long, Integer, String, Long>> crossDs = 
ds.cross(ds2).with(new Tuple5ReturnRight());
+
+               crossDs.writeAsCsv(resultPath);
+               env.execute();
+
+               expected = "1,1,0,Hallo,1\n" +
+                               "1,1,0,Hallo,1\n" +
+                               "1,1,0,Hallo,1\n" +
+                               "2,2,1,Hallo Welt,2\n" +
+                               "2,2,1,Hallo Welt,2\n" +
+                               "2,2,1,Hallo Welt,2\n" +
+                               "2,3,2,Hallo Welt wie,1\n" +
+                               "2,3,2,Hallo Welt wie,1\n" +
+                               "2,3,2,Hallo Welt wie,1\n";
 
-               for(int i=1; i <= NUM_PROGRAMS; i++) {
-                       Configuration config = new Configuration();
-                       config.setInteger("ProgramId", i);
-                       tConfigs.add(config);
-               }
-               
-               return toParameterList(tConfigs);
        }
-       
-       private static class CrossProgs {
-               
-               public static String runProgram(int progId, String resultPath) 
throws Exception {
-                       
-                       switch(progId) {
-                       case 1: {
-                               
-                               /*
-                                * check correctness of cross on two tuple 
inputs 
-                                */
-                               
-                               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-                               
-                               DataSet<Tuple5<Integer, Long, Integer, String, 
Long>> ds = CollectionDataSets.getSmall5TupleDataSet(env);
-                               DataSet<Tuple5<Integer, Long, Integer, String, 
Long>> ds2 = CollectionDataSets.getSmall5TupleDataSet(env);
-                               DataSet<Tuple2<Integer, String>> crossDs = 
ds.cross(ds2).with(new Tuple5Cross());
-                               
-                               crossDs.writeAsCsv(resultPath);
-                               env.execute();
-                               
-                               // return expected result
-                               return "0,HalloHallo\n" +
-                                               "1,HalloHallo Welt\n" +
-                                               "2,HalloHallo Welt wie\n" +
-                                               "1,Hallo WeltHallo\n" +
-                                               "2,Hallo WeltHallo Welt\n" +
-                                               "3,Hallo WeltHallo Welt wie\n" +
-                                               "2,Hallo Welt wieHallo\n" +
-                                               "3,Hallo Welt wieHallo Welt\n" +
-                                               "4,Hallo Welt wieHallo Welt 
wie\n";
-                       }
-                       case 2: {
-                               
-                               /*
-                                * check correctness of cross if UDF returns 
left input object
-                                */
-                               
-                               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-                               
-                               DataSet<Tuple3<Integer, Long, String>> ds = 
CollectionDataSets.getSmall3TupleDataSet(env);
-                               DataSet<Tuple5<Integer, Long, Integer, String, 
Long>> ds2 = CollectionDataSets.getSmall5TupleDataSet(env);
-                               DataSet<Tuple3<Integer, Long, String>> crossDs 
= ds.cross(ds2).with(new Tuple3ReturnLeft());
-                               
-                               crossDs.writeAsCsv(resultPath);
-                               env.execute();
-                               
-                               // return expected result
-                               return "1,1,Hi\n" +
-                                               "1,1,Hi\n" +
-                                               "1,1,Hi\n" +
-                                               "2,2,Hello\n" +
-                                               "2,2,Hello\n" +
-                                               "2,2,Hello\n" +
-                                               "3,2,Hello world\n" +
-                                               "3,2,Hello world\n" +
-                                               "3,2,Hello world\n";
-                               
-                       }
-                       case 3: {
-                               
-                               /*
-                                * check correctness of cross if UDF returns 
right input object
-                                */
-                               
-                               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-                               
-                               DataSet<Tuple3<Integer, Long, String>> ds = 
CollectionDataSets.getSmall3TupleDataSet(env);
-                               DataSet<Tuple5<Integer, Long, Integer, String, 
Long>> ds2 = CollectionDataSets.getSmall5TupleDataSet(env);
-                               DataSet<Tuple5<Integer, Long, Integer, String, 
Long>> crossDs = ds.cross(ds2).with(new Tuple5ReturnRight());
-                               
-                               crossDs.writeAsCsv(resultPath);
-                               env.execute();
-                               
-                               // return expected result
-                               return "1,1,0,Hallo,1\n" +
-                                               "1,1,0,Hallo,1\n" +
-                                               "1,1,0,Hallo,1\n" +
-                                               "2,2,1,Hallo Welt,2\n" +
-                                               "2,2,1,Hallo Welt,2\n" +
-                                               "2,2,1,Hallo Welt,2\n" +
-                                               "2,3,2,Hallo Welt wie,1\n" +
-                                               "2,3,2,Hallo Welt wie,1\n" +
-                                               "2,3,2,Hallo Welt wie,1\n";
-                               
-                       }
-                       case 4: {
-                               
-                               /*
-                                * check correctness of cross with broadcast set
-                                */
-                               
-                               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-                               
-                               DataSet<Integer> intDs = 
CollectionDataSets.getIntegerDataSet(env);
-                               
-                               DataSet<Tuple5<Integer, Long, Integer, String, 
Long>> ds = CollectionDataSets.getSmall5TupleDataSet(env);
-                               DataSet<Tuple5<Integer, Long, Integer, String, 
Long>> ds2 = CollectionDataSets.getSmall5TupleDataSet(env);
-                               DataSet<Tuple3<Integer, Integer, Integer>> 
crossDs = ds.cross(ds2).with(new Tuple5CrossBC()).withBroadcastSet(intDs, 
"ints");
-                               
-                               crossDs.writeAsCsv(resultPath);
-                               env.execute();
-                               
-                               // return expected result
-                               return "2,0,55\n" +
-                                               "3,0,55\n" +
-                                               "3,0,55\n" +
-                                               "3,0,55\n" +
-                                               "4,1,55\n" +
-                                               "4,2,55\n" +
-                                               "3,0,55\n" +
-                                               "4,2,55\n" +
-                                               "4,4,55\n";
-                       }
-                       case 5: {
-                               
-                               /*
-                                * check correctness of crossWithHuge (only 
correctness of result -> should be the same as with normal cross)
-                                */
-                               
-                               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-                               
-                               DataSet<Tuple5<Integer, Long, Integer, String, 
Long>> ds = CollectionDataSets.getSmall5TupleDataSet(env);
-                               DataSet<Tuple5<Integer, Long, Integer, String, 
Long>> ds2 = CollectionDataSets.getSmall5TupleDataSet(env);
-                               DataSet<Tuple2<Integer, String>> crossDs = 
ds.crossWithHuge(ds2).with(new Tuple5Cross());
-                               
-                               crossDs.writeAsCsv(resultPath);
-                               env.execute();
-                               
-                               // return expected result
-                               return "0,HalloHallo\n" +
-                                               "1,HalloHallo Welt\n" +
-                                               "2,HalloHallo Welt wie\n" +
-                                               "1,Hallo WeltHallo\n" +
-                                               "2,Hallo WeltHallo Welt\n" +
-                                               "3,Hallo WeltHallo Welt wie\n" +
-                                               "2,Hallo Welt wieHallo\n" +
-                                               "3,Hallo Welt wieHallo Welt\n" +
-                                               "4,Hallo Welt wieHallo Welt 
wie\n";
-                               
-                       }
-                       case 6: {
-                               
-                               /*
-                                * check correctness of crossWithTiny (only 
correctness of result -> should be the same as with normal cross)
-                                */
-                               
-                               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-                               
-                               DataSet<Tuple5<Integer, Long, Integer, String, 
Long>> ds = CollectionDataSets.getSmall5TupleDataSet(env);
-                               DataSet<Tuple5<Integer, Long, Integer, String, 
Long>> ds2 = CollectionDataSets.getSmall5TupleDataSet(env);
-                               DataSet<Tuple2<Integer, String>> crossDs = 
ds.crossWithTiny(ds2).with(new Tuple5Cross());
-                               
-                               crossDs.writeAsCsv(resultPath);
-                               env.execute();
-                               
-                               // return expected result
-                               return "0,HalloHallo\n" +
-                                               "1,HalloHallo Welt\n" +
-                                               "2,HalloHallo Welt wie\n" +
-                                               "1,Hallo WeltHallo\n" +
-                                               "2,Hallo WeltHallo Welt\n" +
-                                               "3,Hallo WeltHallo Welt wie\n" +
-                                               "2,Hallo Welt wieHallo\n" +
-                                               "3,Hallo Welt wieHallo Welt\n" +
-                                               "4,Hallo Welt wieHallo Welt 
wie\n";
-                               
-                       }
-                       case 7: {
 
-                       /*
-                        * project cross on a tuple input 1
-                        */
+       @Test
+       public void testCorrectnessOfCrossWithBroadcastSet() throws Exception {
+               /*
+                * check correctness of cross with broadcast set
+                */
 
-                               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
 
-                               DataSet<Tuple3<Integer, Long, String>> ds = 
CollectionDataSets.getSmall3TupleDataSet(env);
-                               DataSet<Tuple5<Integer, Long, Integer, String, 
Long>> ds2 = CollectionDataSets.getSmall5TupleDataSet(env);
-                               DataSet<Tuple6<String, Long, String, Integer, 
Long, Long>> crossDs = ds.cross(ds2)
-                                       .projectFirst(2, 1)
-                                       .projectSecond(3)
-                                       .projectFirst(0)
-                                       .projectSecond(4,1);
+               DataSet<Integer> intDs = 
CollectionDataSets.getIntegerDataSet(env);
 
-                               crossDs.writeAsCsv(resultPath);
-                               env.execute();
+               DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds = 
CollectionDataSets.getSmall5TupleDataSet(env);
+               DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = 
CollectionDataSets.getSmall5TupleDataSet(env);
+               DataSet<Tuple3<Integer, Integer, Integer>> crossDs = 
ds.cross(ds2).with(new Tuple5CrossBC()).withBroadcastSet(intDs, "ints");
 
-                               // return expected result
-                               return "Hi,1,Hallo,1,1,1\n" +
-                                       "Hi,1,Hallo Welt,1,2,2\n" +
-                                       "Hi,1,Hallo Welt wie,1,1,3\n" +
-                                       "Hello,2,Hallo,2,1,1\n" +
-                                       "Hello,2,Hallo Welt,2,2,2\n" +
-                                       "Hello,2,Hallo Welt wie,2,1,3\n" +
-                                       "Hello world,2,Hallo,3,1,1\n" +
-                                       "Hello world,2,Hallo Welt,3,2,2\n" +
-                                       "Hello world,2,Hallo Welt wie,3,1,3\n";
+               crossDs.writeAsCsv(resultPath);
+               env.execute();
 
-                       }
-                       case 8: {
+               expected = "2,0,55\n" +
+                               "3,0,55\n" +
+                               "3,0,55\n" +
+                               "3,0,55\n" +
+                               "4,1,55\n" +
+                               "4,2,55\n" +
+                               "3,0,55\n" +
+                               "4,2,55\n" +
+                               "4,4,55\n";
+       }
+
+       @Test
+       public void testCorrectnessOfCrossWithHuge() throws Exception {
+               /*
+                * check correctness of crossWithHuge (only correctness of 
result -> should be the same as with normal cross)
+                */
+
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds = 
CollectionDataSets.getSmall5TupleDataSet(env);
+               DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = 
CollectionDataSets.getSmall5TupleDataSet(env);
+               DataSet<Tuple2<Integer, String>> crossDs = 
ds.crossWithHuge(ds2).with(new Tuple5Cross());
+
+               crossDs.writeAsCsv(resultPath);
+               env.execute();
+
+               expected = "0,HalloHallo\n" +
+                               "1,HalloHallo Welt\n" +
+                               "2,HalloHallo Welt wie\n" +
+                               "1,Hallo WeltHallo\n" +
+                               "2,Hallo WeltHallo Welt\n" +
+                               "3,Hallo WeltHallo Welt wie\n" +
+                               "2,Hallo Welt wieHallo\n" +
+                               "3,Hallo Welt wieHallo Welt\n" +
+                               "4,Hallo Welt wieHallo Welt wie\n";
+       }
+
+       @Test
+       public void testCorrectnessOfCrossWithTiny() throws Exception {
+               /*
+                * check correctness of crossWithTiny (only correctness of 
result -> should be the same as with normal cross)
+                */
+
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds = 
CollectionDataSets.getSmall5TupleDataSet(env);
+               DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = 
CollectionDataSets.getSmall5TupleDataSet(env);
+               DataSet<Tuple2<Integer, String>> crossDs = 
ds.crossWithTiny(ds2).with(new Tuple5Cross());
+
+               crossDs.writeAsCsv(resultPath);
+               env.execute();
+
+               expected = "0,HalloHallo\n" +
+                               "1,HalloHallo Welt\n" +
+                               "2,HalloHallo Welt wie\n" +
+                               "1,Hallo WeltHallo\n" +
+                               "2,Hallo WeltHallo Welt\n" +
+                               "3,Hallo WeltHallo Welt wie\n" +
+                               "2,Hallo Welt wieHallo\n" +
+                               "3,Hallo Welt wieHallo Welt\n" +
+                               "4,Hallo Welt wieHallo Welt wie\n";
+       }
 
-                       /*
-                        * project cross on a tuple input 2
-                        */
+       @Test
+       public void testProjectCrossOnATupleInput1() throws Exception{
+               /*
+                * project cross on a tuple input 1
+                */
 
-                                       final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
 
-                                       DataSet<Tuple3<Integer, Long, String>> 
ds = CollectionDataSets.getSmall3TupleDataSet(env);
-                                       DataSet<Tuple5<Integer, Long, Integer, 
String, Long>> ds2 = CollectionDataSets.getSmall5TupleDataSet(env);
-                                       DataSet<Tuple6<String, String, Long, 
Long, Long,Integer>> crossDs = ds.cross(ds2)
-                                               .projectSecond(3)
-                                               .projectFirst(2, 1)
-                                               .projectSecond(4,1)
+               DataSet<Tuple3<Integer, Long, String>> ds = 
CollectionDataSets.getSmall3TupleDataSet(env);
+               DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = 
CollectionDataSets.getSmall5TupleDataSet(env);
+               DataSet<Tuple6<String, Long, String, Integer, Long, Long>> 
crossDs = ds.cross(ds2)
+                               .projectFirst(2, 1)
+                               .projectSecond(3)
+                               .projectFirst(0)
+                                       .projectSecond(4,1);
+
+               crossDs.writeAsCsv(resultPath);
+               env.execute();
+
+               expected = "Hi,1,Hallo,1,1,1\n" +
+                               "Hi,1,Hallo Welt,1,2,2\n" +
+                               "Hi,1,Hallo Welt wie,1,1,3\n" +
+                               "Hello,2,Hallo,2,1,1\n" +
+                               "Hello,2,Hallo Welt,2,2,2\n" +
+                               "Hello,2,Hallo Welt wie,2,1,3\n" +
+                               "Hello world,2,Hallo,3,1,1\n" +
+                               "Hello world,2,Hallo Welt,3,2,2\n" +
+                               "Hello world,2,Hallo Welt wie,3,1,3\n";
+       }
+
+       @Test
+       public void testProjectCrossOnATupleInput2() throws Exception {
+               /*
+                * project cross on a tuple input 2
+                */
+
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               DataSet<Tuple3<Integer, Long, String>> ds = 
CollectionDataSets.getSmall3TupleDataSet(env);
+               DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = 
CollectionDataSets.getSmall5TupleDataSet(env);
+               DataSet<Tuple6<String, String, Long, Long, Long,Integer>> 
crossDs = ds.cross(ds2)
+                               .projectSecond(3)
+                               .projectFirst(2, 1)
+                               .projectSecond(4,1)
                                                .projectFirst(0);
 
-                                       crossDs.writeAsCsv(resultPath);
-                                       env.execute();
+               crossDs.writeAsCsv(resultPath);
+               env.execute();
 
-                                       // return expected result
-                                       return "Hallo,Hi,1,1,1,1\n" +
-                                               "Hallo Welt,Hi,1,2,2,1\n" +
-                                               "Hallo Welt wie,Hi,1,1,3,1\n" +
-                                               "Hallo,Hello,2,1,1,2\n" +
-                                               "Hallo Welt,Hello,2,2,2,2\n" +
-                                               "Hallo Welt 
wie,Hello,2,1,3,2\n" +
-                                               "Hallo,Hello world,2,1,1,3\n" +
-                                               "Hallo Welt,Hello 
world,2,2,2,3\n" +
-                                               "Hallo Welt wie,Hello 
world,2,1,3,3\n";
+               expected = "Hallo,Hi,1,1,1,1\n" +
+                               "Hallo Welt,Hi,1,2,2,1\n" +
+                               "Hallo Welt wie,Hi,1,1,3,1\n" +
+                               "Hallo,Hello,2,1,1,2\n" +
+                               "Hallo Welt,Hello,2,2,2,2\n" +
+                               "Hallo Welt wie,Hello,2,1,3,2\n" +
+                               "Hallo,Hello world,2,1,1,3\n" +
+                               "Hallo Welt,Hello world,2,2,2,3\n" +
+                               "Hallo Welt wie,Hello world,2,1,3,3\n";
 
-                       }
-                       case 9: {
-                               /*
-                                * check correctness of default cross
-                                */
-                               
-                               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-                               
-                               DataSet<Tuple3<Integer, Long, String>> ds = 
CollectionDataSets.getSmall3TupleDataSet(env);
-                               DataSet<Tuple5<Integer, Long, Integer, String, 
Long>> ds2 = CollectionDataSets.getSmall5TupleDataSet(env);
-                               DataSet<Tuple2<Tuple3<Integer, Long, String>, 
Tuple5<Integer, Long, Integer, String, Long>>> crossDs = ds.cross(ds2);
-                               
-                               crossDs.writeAsCsv(resultPath);
-                               env.execute();
-                               
-                               // return expected result
-                               return "(1,1,Hi),(2,2,1,Hallo Welt,2)\n" +
-                                               "(1,1,Hi),(1,1,0,Hallo,1)\n" +
-                                               "(1,1,Hi),(2,3,2,Hallo Welt 
wie,1)\n" +
-                                               "(2,2,Hello),(2,2,1,Hallo 
Welt,2)\n" +
-                                               "(2,2,Hello),(1,1,0,Hallo,1)\n" 
+
-                                               "(2,2,Hello),(2,3,2,Hallo Welt 
wie,1)\n" +
-                                               "(3,2,Hello world),(2,2,1,Hallo 
Welt,2)\n" +
-                                               "(3,2,Hello 
world),(1,1,0,Hallo,1)\n" +
-                                               "(3,2,Hello world),(2,3,2,Hallo 
Welt wie,1)\n";
-                               
-                       }
+       }
 
-                       case 10: {
-                               
-                               /*
-                                * check correctness of cross on two custom 
type inputs
-                                */
-                               
-                               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-                               
-                               DataSet<CustomType> ds = 
CollectionDataSets.getSmallCustomTypeDataSet(env);
-                               DataSet<CustomType> ds2 = 
CollectionDataSets.getSmallCustomTypeDataSet(env);
-                               DataSet<CustomType> crossDs = 
ds.cross(ds2).with(new CustomTypeCross());
-                               
-                               crossDs.writeAsText(resultPath);
-                               env.execute();
-                               
-                               // return expected result
-                               return "1,0,HiHi\n"
-                                               + "2,1,HiHello\n"
-                                               + "2,2,HiHello world\n"
-                                               + "2,1,HelloHi\n"
-                                               + "4,2,HelloHello\n"
-                                               + "4,3,HelloHello world\n"
-                                               + "2,2,Hello worldHi\n"
-                                               + "4,3,Hello worldHello\n"
-                                               + "4,4,Hello worldHello world";
-                       }
-                       
-                       case 11: {
-                               
-                               /*
-                                * check correctness of cross a tuple input and 
a custom type input
-                                */
-                               
-                               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-                               
-                               DataSet<Tuple5<Integer, Long, Integer, String, 
Long>> ds = CollectionDataSets.getSmall5TupleDataSet(env);
-                               DataSet<CustomType> ds2 = 
CollectionDataSets.getSmallCustomTypeDataSet(env);
-                               DataSet<Tuple3<Integer, Long, String>> crossDs 
= ds.cross(ds2).with(new MixedCross());
-                               
-                               crossDs.writeAsCsv(resultPath);
-                               env.execute();
-                               
-                               // return expected result
-                               return "2,0,HalloHi\n" +
-                                               "3,0,HalloHello\n" +
-                                               "3,0,HalloHello world\n" +
-                                               "3,0,Hallo WeltHi\n" +
-                                               "4,1,Hallo WeltHello\n" +
-                                               "4,2,Hallo WeltHello world\n" +
-                                               "3,0,Hallo Welt wieHi\n" +
-                                               "4,2,Hallo Welt wieHello\n" +
-                                               "4,4,Hallo Welt wieHello 
world\n";
-                               
-                       }
-                       default: 
-                               throw new IllegalArgumentException("Invalid 
program id");
-                       }
-                       
-               }
-       
+       @Test
+       public void testCorrectnessOfDefaultCross() throws Exception {
+               /*
+                * check correctness of default cross
+                */
+
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               DataSet<Tuple3<Integer, Long, String>> ds = 
CollectionDataSets.getSmall3TupleDataSet(env);
+               DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = 
CollectionDataSets.getSmall5TupleDataSet(env);
+               DataSet<Tuple2<Tuple3<Integer, Long, String>, Tuple5<Integer, 
Long, Integer, String, Long>>> crossDs = ds.cross(ds2);
+
+               crossDs.writeAsCsv(resultPath);
+               env.execute();
+
+               expected = "(1,1,Hi),(2,2,1,Hallo Welt,2)\n" +
+                               "(1,1,Hi),(1,1,0,Hallo,1)\n" +
+                               "(1,1,Hi),(2,3,2,Hallo Welt wie,1)\n" +
+                               "(2,2,Hello),(2,2,1,Hallo Welt,2)\n" +
+                               "(2,2,Hello),(1,1,0,Hallo,1)\n" +
+                               "(2,2,Hello),(2,3,2,Hallo Welt wie,1)\n" +
+                               "(3,2,Hello world),(2,2,1,Hallo Welt,2)\n" +
+                               "(3,2,Hello world),(1,1,0,Hallo,1)\n" +
+                               "(3,2,Hello world),(2,3,2,Hallo Welt wie,1)\n";
+       }
+
+       @Test
+       public void testCorrectnessOfCrossOnTwoCustomTypeInputs() throws 
Exception {
+               /*
+                * check correctness of cross on two custom type inputs
+                */
+
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               DataSet<CustomType> ds = 
CollectionDataSets.getSmallCustomTypeDataSet(env);
+               DataSet<CustomType> ds2 = 
CollectionDataSets.getSmallCustomTypeDataSet(env);
+               DataSet<CustomType> crossDs = ds.cross(ds2).with(new 
CustomTypeCross());
+
+               crossDs.writeAsText(resultPath);
+               env.execute();
+
+               expected = "1,0,HiHi\n"
+                               + "2,1,HiHello\n"
+                               + "2,2,HiHello world\n"
+                               + "2,1,HelloHi\n"
+                               + "4,2,HelloHello\n"
+                               + "4,3,HelloHello world\n"
+                               + "2,2,Hello worldHi\n"
+                               + "4,3,Hello worldHello\n"
+                               + "4,4,Hello worldHello world";
+       }
+
+       @Test
+       public void testCorrectnessOfCrossATupleInputAndACustomTypeInput() 
throws Exception {
+               /*
+                * check correctness of cross a tuple input and a custom type 
input
+                */
+
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds = 
CollectionDataSets.getSmall5TupleDataSet(env);
+               DataSet<CustomType> ds2 = 
CollectionDataSets.getSmallCustomTypeDataSet(env);
+               DataSet<Tuple3<Integer, Long, String>> crossDs = 
ds.cross(ds2).with(new MixedCross());
+
+               crossDs.writeAsCsv(resultPath);
+               env.execute();
+
+               expected = "2,0,HalloHi\n" +
+                               "3,0,HalloHello\n" +
+                               "3,0,HalloHello world\n" +
+                               "3,0,Hallo WeltHi\n" +
+                               "4,1,Hallo WeltHello\n" +
+                               "4,2,Hallo WeltHello world\n" +
+                               "3,0,Hallo Welt wieHi\n" +
+                               "4,2,Hallo Welt wieHello\n" +
+                               "4,4,Hallo Welt wieHello world\n";
        }
        
        public static class Tuple5Cross implements 
CrossFunction<Tuple5<Integer, Long, Integer, String, Long>, Tuple5<Integer, 
Long, Integer, String, Long>, Tuple2<Integer, String>> {

Reply via email to