http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/a0f77857/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupReduceITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupReduceITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupReduceITCase.java
index 8994ba9..aed7007 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupReduceITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupReduceITCase.java
@@ -18,11 +18,8 @@
 
 package org.apache.flink.test.javaApiOperators;
 
-import java.io.FileNotFoundException;
-import java.io.IOException;
 import java.util.Collection;
 import java.util.Iterator;
-import java.util.LinkedList;
 
 import org.apache.flink.api.common.functions.GroupReduceFunction;
 import org.apache.flink.api.common.operators.Order;
@@ -41,729 +38,739 @@ import 
org.apache.flink.test.javaApiOperators.util.CollectionDataSets.FromTuple;
 import 
org.apache.flink.test.javaApiOperators.util.CollectionDataSets.FromTupleWithCTor;
 import org.apache.flink.test.javaApiOperators.util.CollectionDataSets.POJO;
 import 
org.apache.flink.test.javaApiOperators.util.CollectionDataSets.PojoContainingTupleAndWritable;
-import org.apache.flink.test.util.JavaProgramTestBase;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
 import org.apache.flink.util.Collector;
+import org.hamcrest.core.IsEqual;
+import org.hamcrest.core.IsNot;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
-import org.junit.runners.Parameterized.Parameters;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 
 @SuppressWarnings("serial")
 @RunWith(Parameterized.class)
-public class GroupReduceITCase extends JavaProgramTestBase {
-       
-       private static int NUM_PROGRAMS = 28;
-       
-       private int curProgId = config.getInteger("ProgramId", -1);
-       private String resultPath;
-       private String expectedResult;
-       
-       public GroupReduceITCase(Configuration config) {
-               super(config);
-       }
-       
-       @Override
-       protected void preSubmit() throws Exception {
-               resultPath = getTempDirPath("result");
-       }
+public class GroupReduceITCase extends MultipleProgramsTestBase {
 
-       @Override
-       protected void testProgram() throws Exception {
-               expectedResult = GroupReduceProgs.runProgram(curProgId, 
resultPath, isCollectionExecution());
-       }
-       
-       @Override
-       protected void postSubmit() throws Exception {
-               if (expectedResult != null) {
-                       compareResultsByLinesInMemory(expectedResult, 
resultPath);
-               }
+       public GroupReduceITCase(ExecutionMode mode){
+               super(mode);
        }
-       
-       @Parameters
-       public static Collection<Object[]> getConfigurations() throws 
FileNotFoundException, IOException {
 
-               LinkedList<Configuration> tConfigs = new 
LinkedList<Configuration>();
+       private String resultPath;
+       private String expected;
+
+       @Rule
+       public TemporaryFolder tempFolder = new TemporaryFolder();
+
+       @Before
+       public void before() throws Exception{
+               resultPath = tempFolder.newFile().toURI().toString();
+       }
 
-               for(int i=1; i <= NUM_PROGRAMS; i++) {
-                       Configuration config = new Configuration();
-                       config.setInteger("ProgramId", i);
-                       tConfigs.add(config);
+       @After
+       public void after() throws Exception{
+               if(expected != null) {
+                       compareResultsByLinesInMemory(expected, resultPath);
                }
-               
-               return toParameterList(tConfigs);
        }
-       
-       @SuppressWarnings("unused")
-       private static class GroupReduceProgs {
-               
-               public static String runProgram(int progId, String resultPath, 
boolean collectionExecution) throws Exception {
 
-                       switch (progId) {
-                               case 1: {
-                               
-                               /*
+       @Test
+       public void testCorrectnessOfGroupReduceOnTuplesWithKeyFieldSelector() 
throws Exception{
+               /*
                                 * check correctness of groupReduce on tuples 
with key field selector
                                 */
 
-                                       final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
 
-                                       DataSet<Tuple3<Integer, Long, String>> 
ds = CollectionDataSets.get3TupleDataSet(env);
-                                       DataSet<Tuple2<Integer, Long>> reduceDs 
= ds.
-                                                       
groupBy(1).reduceGroup(new Tuple3GroupReduce());
+               DataSet<Tuple3<Integer, Long, String>> ds = 
CollectionDataSets.get3TupleDataSet(env);
+               DataSet<Tuple2<Integer, Long>> reduceDs = ds.
+                               groupBy(1).reduceGroup(new Tuple3GroupReduce());
 
-                                       reduceDs.writeAsCsv(resultPath);
-                                       env.execute();
+               reduceDs.writeAsCsv(resultPath);
+               env.execute();
 
-                                       // return expected result
-                                       return "1,1\n" +
-                                                       "5,2\n" +
-                                                       "15,3\n" +
-                                                       "34,4\n" +
-                                                       "65,5\n" +
-                                                       "111,6\n";
-                               }
-                               case 2: {
-                               
-                               /*
-                                * check correctness of groupReduce on tuples 
with multiple key field selector
-                                */
+               expected = "1,1\n" +
+                               "5,2\n" +
+                               "15,3\n" +
+                               "34,4\n" +
+                               "65,5\n" +
+                               "111,6\n";
+       }
 
-                                       final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+       @Test
+       public void 
testCorrectnessOfGroupReduceOnTuplesWithMultipleKeyFieldSelectors() throws
+                       Exception {
+               /*
+                * check correctness of groupReduce on tuples with multiple key 
field selector
+                */
+
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds = 
CollectionDataSets.get5TupleDataSet(env);
+               DataSet<Tuple5<Integer, Long, Integer, String, Long>> reduceDs 
= ds.
+                               groupBy(4, 0).reduceGroup(new 
Tuple5GroupReduce());
+
+               reduceDs.writeAsCsv(resultPath);
+               env.execute();
+
+               expected = "1,1,0,P-),1\n" +
+                               "2,3,0,P-),1\n" +
+                               "2,2,0,P-),2\n" +
+                               "3,9,0,P-),2\n" +
+                               "3,6,0,P-),3\n" +
+                               "4,17,0,P-),1\n" +
+                               "4,17,0,P-),2\n" +
+                               "5,11,0,P-),1\n" +
+                               "5,29,0,P-),2\n" +
+                               "5,25,0,P-),3\n";
+       }
 
-                                       DataSet<Tuple5<Integer, Long, Integer, 
String, Long>> ds = CollectionDataSets.get5TupleDataSet(env);
-                                       DataSet<Tuple5<Integer, Long, Integer, 
String, Long>> reduceDs = ds.
-                                                       groupBy(4, 
0).reduceGroup(new Tuple5GroupReduce());
+       @Test
+       public void 
testCorrectnessOfGroupReduceOnTuplesWithKeyFieldSelectorAndGroupSorting() throws
+                       Exception {
+               /*
+                * check correctness of groupReduce on tuples with key field 
selector and group sorting
+                */
+
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               env.setDegreeOfParallelism(1);
+
+               DataSet<Tuple3<Integer, Long, String>> ds = 
CollectionDataSets.get3TupleDataSet(env);
+               DataSet<Tuple3<Integer, Long, String>> reduceDs = ds.
+                               groupBy(1).sortGroup(2, 
Order.ASCENDING).reduceGroup(new Tuple3SortedGroupReduce());
+
+               reduceDs.writeAsCsv(resultPath);
+               env.execute();
+
+               expected = "1,1,Hi\n" +
+                               "5,2,Hello-Hello world\n" +
+                               "15,3,Hello world, how are you?-I am fine.-Luke 
Skywalker\n" +
+                               
"34,4,Comment#1-Comment#2-Comment#3-Comment#4\n" +
+                               
"65,5,Comment#5-Comment#6-Comment#7-Comment#8-Comment#9\n" +
+                               
"111,6,Comment#10-Comment#11-Comment#12-Comment#13-Comment#14-Comment#15\n";
+       }
 
-                                       reduceDs.writeAsCsv(resultPath);
-                                       env.execute();
+       @Test
+       public void testCorrectnessOfGroupReduceOnTuplesWithKeyExtractor() 
throws Exception {
+               /*
+                * check correctness of groupReduce on tuples with key extractor
+                */
 
-                                       // return expected result
-                                       return "1,1,0,P-),1\n" +
-                                                       "2,3,0,P-),1\n" +
-                                                       "2,2,0,P-),2\n" +
-                                                       "3,9,0,P-),2\n" +
-                                                       "3,6,0,P-),3\n" +
-                                                       "4,17,0,P-),1\n" +
-                                                       "4,17,0,P-),2\n" +
-                                                       "5,11,0,P-),1\n" +
-                                                       "5,29,0,P-),2\n" +
-                                                       "5,25,0,P-),3\n";
-                               }
-                               case 3: {
-                               
-                               /*
-                                * check correctness of groupReduce on tuples 
with key field selector and group sorting
-                                */
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
 
-                                       final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-                                       env.setDegreeOfParallelism(1);
+               DataSet<Tuple3<Integer, Long, String>> ds = 
CollectionDataSets.get3TupleDataSet(env);
+               DataSet<Tuple2<Integer, Long>> reduceDs = ds.
+                               groupBy(new KeySelector1()).reduceGroup(new 
Tuple3GroupReduce());
 
-                                       DataSet<Tuple3<Integer, Long, String>> 
ds = CollectionDataSets.get3TupleDataSet(env);
-                                       DataSet<Tuple3<Integer, Long, String>> 
reduceDs = ds.
-                                                       groupBy(1).sortGroup(2, 
Order.ASCENDING).reduceGroup(new Tuple3SortedGroupReduce());
+               reduceDs.writeAsCsv(resultPath);
+               env.execute();
 
-                                       reduceDs.writeAsCsv(resultPath);
-                                       env.execute();
+               expected = "1,1\n" +
+                               "5,2\n" +
+                               "15,3\n" +
+                               "34,4\n" +
+                               "65,5\n" +
+                               "111,6\n";
+       }
 
-                                       // return expected result
-                                       return "1,1,Hi\n" +
-                                                       "5,2,Hello-Hello 
world\n" +
-                                                       "15,3,Hello world, how 
are you?-I am fine.-Luke Skywalker\n" +
-                                                       
"34,4,Comment#1-Comment#2-Comment#3-Comment#4\n" +
-                                                       
"65,5,Comment#5-Comment#6-Comment#7-Comment#8-Comment#9\n" +
-                                                       
"111,6,Comment#10-Comment#11-Comment#12-Comment#13-Comment#14-Comment#15\n";
+       public static class KeySelector1 implements KeySelector<Tuple3<Integer, 
Long, String>, Long> {
+               private static final long serialVersionUID = 1L;
 
-                               }
-                               case 4: {
-                               /*
-                                * check correctness of groupReduce on tuples 
with key extractor
-                                */
+               @Override
+               public Long getKey(Tuple3<Integer, Long, String> in) {
+                       return in.f1;
+               }
+       }
 
-                                       final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+       @Test
+       public void testCorrectnessOfGroupReduceOnCustomTypeWithTypeExtractor() 
throws Exception {
+               /*
+                * check correctness of groupReduce on custom type with type 
extractor
+                */
 
-                                       DataSet<Tuple3<Integer, Long, String>> 
ds = CollectionDataSets.get3TupleDataSet(env);
-                                       DataSet<Tuple2<Integer, Long>> reduceDs 
= ds.
-                                                       groupBy(new 
KeySelector<Tuple3<Integer, Long, String>, Long>() {
-                                                               private static 
final long serialVersionUID = 1L;
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
 
-                                                               @Override
-                                                               public Long 
getKey(Tuple3<Integer, Long, String> in) {
-                                                                       return 
in.f1;
-                                                               }
-                                                       }).reduceGroup(new 
Tuple3GroupReduce());
+               DataSet<CustomType> ds = 
CollectionDataSets.getCustomTypeDataSet(env);
+               DataSet<CustomType> reduceDs = ds.
+                               groupBy(new KeySelector2()).reduceGroup(new 
CustomTypeGroupReduce());
 
-                                       reduceDs.writeAsCsv(resultPath);
-                                       env.execute();
+               reduceDs.writeAsText(resultPath);
+               env.execute();
 
-                                       // return expected result
-                                       return "1,1\n" +
-                                                       "5,2\n" +
-                                                       "15,3\n" +
-                                                       "34,4\n" +
-                                                       "65,5\n" +
-                                                       "111,6\n";
+               expected = "1,0,Hello!\n" +
+                               "2,3,Hello!\n" +
+                               "3,12,Hello!\n" +
+                               "4,30,Hello!\n" +
+                               "5,60,Hello!\n" +
+                               "6,105,Hello!\n";
+       }
 
-                               }
-                               case 5: {
-                               
-                               /*
-                                * check correctness of groupReduce on custom 
type with type extractor
-                                */
+       public static class KeySelector2 implements KeySelector<CustomType, 
Integer> {
+               private static final long serialVersionUID = 1L;
 
-                                       final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               @Override
+               public Integer getKey(CustomType in) {
+                       return in.myInt;
+               }
+       }
 
-                                       DataSet<CustomType> ds = 
CollectionDataSets.getCustomTypeDataSet(env);
-                                       DataSet<CustomType> reduceDs = ds.
-                                                       groupBy(new 
KeySelector<CustomType, Integer>() {
-                                                               private static 
final long serialVersionUID = 1L;
+       @Test
+       public void testCorrectnessOfAllGroupReduceForTuples() throws Exception 
{
+               /*
+                * check correctness of all-groupreduce for tuples
+                */
 
-                                                               @Override
-                                                               public Integer 
getKey(CustomType in) {
-                                                                       return 
in.myInt;
-                                                               }
-                                                       }).reduceGroup(new 
CustomTypeGroupReduce());
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
 
-                                       reduceDs.writeAsText(resultPath);
-                                       env.execute();
+               DataSet<Tuple3<Integer, Long, String>> ds = 
CollectionDataSets.get3TupleDataSet(env);
+               DataSet<Tuple3<Integer, Long, String>> reduceDs = 
ds.reduceGroup(new AllAddingTuple3GroupReduce());
 
-                                       // return expected result
-                                       return "1,0,Hello!\n" +
-                                                       "2,3,Hello!\n" +
-                                                       "3,12,Hello!\n" +
-                                                       "4,30,Hello!\n" +
-                                                       "5,60,Hello!\n" +
-                                                       "6,105,Hello!\n";
-                               }
-                               case 6: {
-                               
-                               /*
-                                * check correctness of all-groupreduce for 
tuples
-                                */
+               reduceDs.writeAsCsv(resultPath);
+               env.execute();
 
-                                       final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               expected = "231,91,Hello World\n";
+       }
 
-                                       DataSet<Tuple3<Integer, Long, String>> 
ds = CollectionDataSets.get3TupleDataSet(env);
-                                       DataSet<Tuple3<Integer, Long, String>> 
reduceDs = ds.reduceGroup(new AllAddingTuple3GroupReduce());
+       @Test
+       public void testCorrectnessOfAllGroupReduceForCustomTypes() throws 
Exception {
+               /*
+                * check correctness of all-groupreduce for custom types
+                */
 
-                                       reduceDs.writeAsCsv(resultPath);
-                                       env.execute();
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
 
-                                       // return expected result
-                                       return "231,91,Hello World\n";
-                               }
-                               case 7: {
-                               /*
-                                * check correctness of all-groupreduce for 
custom types
-                                */
+               DataSet<CustomType> ds = 
CollectionDataSets.getCustomTypeDataSet(env);
+               DataSet<CustomType> reduceDs = ds.reduceGroup(new 
AllAddingCustomTypeGroupReduce());
 
-                                       final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               reduceDs.writeAsText(resultPath);
+               env.execute();
 
-                                       DataSet<CustomType> ds = 
CollectionDataSets.getCustomTypeDataSet(env);
-                                       DataSet<CustomType> reduceDs = 
ds.reduceGroup(new AllAddingCustomTypeGroupReduce());
+               expected = "91,210,Hello!";
+       }
 
-                                       reduceDs.writeAsText(resultPath);
-                                       env.execute();
+       @Test
+       public void testCorrectnessOfGroupReduceWithBroadcastSet() throws 
Exception {
+               /*
+                * check correctness of groupReduce with broadcast set
+                */
 
-                                       // return expected result
-                                       return "91,210,Hello!";
-                               }
-                               case 8: {
-                               
-                               /*
-                                * check correctness of groupReduce with 
broadcast set
-                                */
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
 
-                                       final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               DataSet<Integer> intDs = 
CollectionDataSets.getIntegerDataSet(env);
 
-                                       DataSet<Integer> intDs = 
CollectionDataSets.getIntegerDataSet(env);
+               DataSet<Tuple3<Integer, Long, String>> ds = 
CollectionDataSets.get3TupleDataSet(env);
+               DataSet<Tuple3<Integer, Long, String>> reduceDs = ds.
+                               groupBy(1).reduceGroup(new 
BCTuple3GroupReduce()).withBroadcastSet(intDs, "ints");
 
-                                       DataSet<Tuple3<Integer, Long, String>> 
ds = CollectionDataSets.get3TupleDataSet(env);
-                                       DataSet<Tuple3<Integer, Long, String>> 
reduceDs = ds.
-                                                       
groupBy(1).reduceGroup(new BCTuple3GroupReduce()).withBroadcastSet(intDs, 
"ints");
+               reduceDs.writeAsCsv(resultPath);
+               env.execute();
 
-                                       reduceDs.writeAsCsv(resultPath);
-                                       env.execute();
+               expected = "1,1,55\n" +
+                               "5,2,55\n" +
+                               "15,3,55\n" +
+                               "34,4,55\n" +
+                               "65,5,55\n" +
+                               "111,6,55\n";
+       }
 
-                                       // return expected result
-                                       return "1,1,55\n" +
-                                                       "5,2,55\n" +
-                                                       "15,3,55\n" +
-                                                       "34,4,55\n" +
-                                                       "65,5,55\n" +
-                                                       "111,6,55\n";
-                               }
-                               case 9: {
-                               
-                               /*
-                                * check correctness of groupReduce if UDF 
returns input objects multiple times and changes it in between
-                                */
+       @Test
+       public void
+       
testCorrectnessOfGroupReduceIfUDFReturnsInputObjectsMultipleTimesWhileChangingThem()
 throws
+                       Exception{
+               /*
+                * check correctness of groupReduce if UDF returns input 
objects multiple times and changes it in between
+                */
+
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               DataSet<Tuple3<Integer, Long, String>> ds = 
CollectionDataSets.get3TupleDataSet(env);
+               DataSet<Tuple3<Integer, Long, String>> reduceDs = ds.
+                               groupBy(1).reduceGroup(new 
InputReturningTuple3GroupReduce());
+
+               reduceDs.writeAsCsv(resultPath);
+               env.execute();
+
+               expected = "11,1,Hi!\n" +
+                               "21,1,Hi again!\n" +
+                               "12,2,Hi!\n" +
+                               "22,2,Hi again!\n" +
+                               "13,2,Hi!\n" +
+                               "23,2,Hi again!\n";
+       }
 
-                                       final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+       @Test public void 
testCorrectnessOfGroupReduceOnCustomTypeWithKeyExtractorAndCombine() throws
+                       Exception {
+               /*
+                * check correctness of groupReduce on custom type with key 
extractor and combine
+                */
 
-                                       DataSet<Tuple3<Integer, Long, String>> 
ds = CollectionDataSets.get3TupleDataSet(env);
-                                       DataSet<Tuple3<Integer, Long, String>> 
reduceDs = ds.
-                                                       
groupBy(1).reduceGroup(new InputReturningTuple3GroupReduce());
+               org.junit.Assume.assumeThat(mode, new IsNot(new 
IsEqual<ExecutionMode>(ExecutionMode
+                               .COLLECTION)));
 
-                                       reduceDs.writeAsCsv(resultPath);
-                                       env.execute();
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
 
-                                       // return expected result
-                                       return "11,1,Hi!\n" +
-                                                       "21,1,Hi again!\n" +
-                                                       "12,2,Hi!\n" +
-                                                       "22,2,Hi again!\n" +
-                                                       "13,2,Hi!\n" +
-                                                       "23,2,Hi again!\n";
-                               }
-                               case 10: {
-                               
-                               /*
-                                * check correctness of groupReduce on custom 
type with key extractor and combine
-                                */
+               DataSet<CustomType> ds = 
CollectionDataSets.getCustomTypeDataSet(env);
+               DataSet<CustomType> reduceDs = ds.
+                               groupBy(new KeySelector3()).reduceGroup(new 
CustomTypeGroupReduceWithCombine());
 
-                                       final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               reduceDs.writeAsText(resultPath);
+               env.execute();
 
-                                       DataSet<CustomType> ds = 
CollectionDataSets.getCustomTypeDataSet(env);
-                                       DataSet<CustomType> reduceDs = ds.
-                                                       groupBy(new 
KeySelector<CustomType, Integer>() {
-                                                               private static 
final long serialVersionUID = 1L;
+               expected = "1,0,test1\n" +
+                               "2,3,test2\n" +
+                               "3,12,test3\n" +
+                               "4,30,test4\n" +
+                               "5,60,test5\n" +
+                               "6,105,test6\n";
+       }
 
-                                                               @Override
-                                                               public Integer 
getKey(CustomType in) {
-                                                                       return 
in.myInt;
-                                                               }
-                                                       }).reduceGroup(new 
CustomTypeGroupReduceWithCombine());
+       public static class KeySelector3 implements KeySelector<CustomType, 
Integer> {
+               private static final long serialVersionUID = 1L;
 
-                                       reduceDs.writeAsText(resultPath);
-                                       env.execute();
+               @Override
+               public Integer getKey(CustomType in) {
+                       return in.myInt;
+               }
+       }
 
-                                       // return expected result
-                                       if (collectionExecution) {
-                                               return null;
-
-                                       } else {
-                                               return "1,0,test1\n" +
-                                                               "2,3,test2\n" +
-                                                               "3,12,test3\n" +
-                                                               "4,30,test4\n" +
-                                                               "5,60,test5\n" +
-                                                               "6,105,test6\n";
-                                       }
-                               }
-                               case 11: {
-                               
-                               /*
-                                * check correctness of groupReduce on tuples 
with combine
-                                */
+       @Test
+       public void testCorrectnessOfGroupReduceOnTuplesWithCombine() throws 
Exception {
+               /*
+                * check correctness of groupReduce on tuples with combine
+                */
+               org.junit.Assume.assumeThat(mode, new IsNot(new 
IsEqual<ExecutionMode>(ExecutionMode
+                               .COLLECTION)));
+
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               env.setDegreeOfParallelism(2); // important because it 
determines how often the combiner is called
+
+               DataSet<Tuple3<Integer, Long, String>> ds = 
CollectionDataSets.get3TupleDataSet(env);
+               DataSet<Tuple2<Integer, String>> reduceDs = ds.
+                               groupBy(1).reduceGroup(new 
Tuple3GroupReduceWithCombine());
+
+               reduceDs.writeAsCsv(resultPath);
+               env.execute();
+
+               expected = "1,test1\n" +
+                               "5,test2\n" +
+                               "15,test3\n" +
+                               "34,test4\n" +
+                               "65,test5\n" +
+                               "111,test6\n";
+       }
 
-                                       final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-                                       env.setDegreeOfParallelism(2); // 
important because it determines how often the combiner is called
+       @Test
+       public void testCorrectnessOfAllGroupReduceForTuplesWithCombine() 
throws Exception {
+               /*
+                * check correctness of all-groupreduce for tuples with combine
+                */
+               org.junit.Assume.assumeThat(mode, new IsNot(new 
IsEqual<ExecutionMode>(ExecutionMode
+                               .COLLECTION)));
 
-                                       DataSet<Tuple3<Integer, Long, String>> 
ds = CollectionDataSets.get3TupleDataSet(env);
-                                       DataSet<Tuple2<Integer, String>> 
reduceDs = ds.
-                                                       
groupBy(1).reduceGroup(new Tuple3GroupReduceWithCombine());
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
 
-                                       reduceDs.writeAsCsv(resultPath);
-                                       env.execute();
+               DataSet<Tuple3<Integer, Long, String>> ds = 
CollectionDataSets.get3TupleDataSet(env)
+                               .map(new IdentityMapper<Tuple3<Integer, Long, 
String>>()).setParallelism(4);
 
-                                       // return expected result
-                                       if (collectionExecution) {
-                                               return null;
-
-                                       } else {
-                                               return "1,test1\n" +
-                                                               "5,test2\n" +
-                                                               "15,test3\n" +
-                                                               "34,test4\n" +
-                                                               "65,test5\n" +
-                                                               "111,test6\n";
-                                       }
-                               }
-                               // all-groupreduce with combine
-                               case 12: {
-                               
-                               /*
-                                * check correctness of all-groupreduce for 
tuples with combine
-                                */
+               Configuration cfg = new Configuration();
+               cfg.setString(PactCompiler.HINT_SHIP_STRATEGY, 
PactCompiler.HINT_SHIP_STRATEGY_REPARTITION);
+               DataSet<Tuple2<Integer, String>> reduceDs = ds.reduceGroup(new 
Tuple3AllGroupReduceWithCombine())
+                               .withParameters(cfg);
 
-                                       final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               reduceDs.writeAsCsv(resultPath);
+               env.execute();
 
-                                       DataSet<Tuple3<Integer, Long, String>> 
ds = CollectionDataSets.get3TupleDataSet(env)
-                                                       .map(new 
IdentityMapper<Tuple3<Integer, Long, String>>()).setParallelism(4);
+               expected = "322," +
+                               
"testtesttesttesttesttesttesttesttesttesttesttesttesttesttesttesttesttesttesttesttest\n";
+       }
 
-                                       Configuration cfg = new Configuration();
-                                       
cfg.setString(PactCompiler.HINT_SHIP_STRATEGY, 
PactCompiler.HINT_SHIP_STRATEGY_REPARTITION);
-                                       DataSet<Tuple2<Integer, String>> 
reduceDs = ds.reduceGroup(new Tuple3AllGroupReduceWithCombine())
-                                                       .withParameters(cfg);
+       @Test
+       public void testCorrectnessOfGroupreduceWithDescendingGroupSort() 
throws Exception {
+               /*
+                * check correctness of groupReduce with descending group sort
+                */
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               env.setDegreeOfParallelism(1);
+
+               DataSet<Tuple3<Integer, Long, String>> ds = 
CollectionDataSets.get3TupleDataSet(env);
+               DataSet<Tuple3<Integer, Long, String>> reduceDs = ds.
+                               groupBy(1).sortGroup(2, 
Order.DESCENDING).reduceGroup(new Tuple3SortedGroupReduce());
+
+               reduceDs.writeAsCsv(resultPath);
+               env.execute();
+
+               expected = "1,1,Hi\n" +
+                               "5,2,Hello world-Hello\n" +
+                               "15,3,Luke Skywalker-I am fine.-Hello world, 
how are you?\n" +
+                               
"34,4,Comment#4-Comment#3-Comment#2-Comment#1\n" +
+                               
"65,5,Comment#9-Comment#8-Comment#7-Comment#6-Comment#5\n" +
+                               
"111,6,Comment#15-Comment#14-Comment#13-Comment#12-Comment#11-Comment#10\n";
+       }
 
-                                       reduceDs.writeAsCsv(resultPath);
-                                       env.execute();
+       @Test
+       public void 
testCorrectnessOfGroupReduceOnTuplesWithTupleReturningKeySelector() throws
+                       Exception {
+               /*
+                                        * check correctness of groupReduce on 
tuples with tuple-returning key selector
+                                        */
 
-                                       // return expected result
-                                       if (collectionExecution) {
-                                               return null;
-                                       } else {
-                                               return 
"322,testtesttesttesttesttesttesttesttesttesttesttesttesttesttesttesttesttesttesttesttest\n";
-                                       }
-                               }
-                               case 13: {
-                               
-                               /*
-                                * check correctness of groupReduce with 
descending group sort
-                                */
-                                       final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-                                       env.setDegreeOfParallelism(1);
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds = 
CollectionDataSets.get5TupleDataSet(env);
+               DataSet<Tuple5<Integer, Long, Integer, String, Long>> reduceDs 
= ds.
+                               groupBy(new KeySelector4()).reduceGroup(new 
Tuple5GroupReduce());
+
+               reduceDs.writeAsCsv(resultPath);
+               env.execute();
+
+               expected = "1,1,0,P-),1\n" +
+                               "2,3,0,P-),1\n" +
+                               "2,2,0,P-),2\n" +
+                               "3,9,0,P-),2\n" +
+                               "3,6,0,P-),3\n" +
+                               "4,17,0,P-),1\n" +
+                               "4,17,0,P-),2\n" +
+                               "5,11,0,P-),1\n" +
+                               "5,29,0,P-),2\n" +
+                               "5,25,0,P-),3\n";
+       }
 
-                                       DataSet<Tuple3<Integer, Long, String>> 
ds = CollectionDataSets.get3TupleDataSet(env);
-                                       DataSet<Tuple3<Integer, Long, String>> 
reduceDs = ds.
-                                                       groupBy(1).sortGroup(2, 
Order.DESCENDING).reduceGroup(new Tuple3SortedGroupReduce());
+       public static class KeySelector4 implements 
KeySelector<Tuple5<Integer,Long,Integer,String,Long>, Tuple2<Integer, Long>> {
+               private static final long serialVersionUID = 1L;
 
-                                       reduceDs.writeAsCsv(resultPath);
-                                       env.execute();
+               @Override
+               public Tuple2<Integer, Long> 
getKey(Tuple5<Integer,Long,Integer,String,Long> t) {
+                       return new Tuple2<Integer, Long>(t.f0, t.f4);
+               }
+       }
 
-                                       // return expected result
-                                       return "1,1,Hi\n" +
-                                                       "5,2,Hello 
world-Hello\n" +
-                                                       "15,3,Luke Skywalker-I 
am fine.-Hello world, how are you?\n" +
-                                                       
"34,4,Comment#4-Comment#3-Comment#2-Comment#1\n" +
-                                                       
"65,5,Comment#9-Comment#8-Comment#7-Comment#6-Comment#5\n" +
-                                                       
"111,6,Comment#15-Comment#14-Comment#13-Comment#12-Comment#11-Comment#10\n";
+       @Test
+       public void 
testInputOfCombinerIsSortedForCombinableGroupReduceWithGroupSorting() throws
+                       Exception {
+               /*
+                * check that input of combiner is also sorted for combinable 
groupReduce with group sorting
+                */
+
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               env.setDegreeOfParallelism(1);
+
+               DataSet<Tuple3<Integer, Long, String>> ds = 
CollectionDataSets.get3TupleDataSet(env);
+               DataSet<Tuple3<Integer, Long, String>> reduceDs = ds.
+                               groupBy(1).sortGroup(0, 
Order.ASCENDING).reduceGroup(new OrderCheckingCombinableReduce());
+
+               reduceDs.writeAsCsv(resultPath);
+               env.execute();
+
+               expected = "1,1,Hi\n" +
+                               "2,2,Hello\n" +
+                               "4,3,Hello world, how are you?\n" +
+                               "7,4,Comment#1\n" +
+                               "11,5,Comment#5\n" +
+                               "16,6,Comment#10\n";
+       }
 
-                               }
-                               case 14: {
-                                       /*
-                                        * check correctness of groupReduce on 
tuples with tuple-returning key selector
-                                        */
+       @Test
+       public void testDeepNesting() throws Exception {
+               /*
+                * Deep nesting test
+                * + null value in pojo
+                */
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
 
-                                               final ExecutionEnvironment env 
= ExecutionEnvironment.getExecutionEnvironment();
+               DataSet<CrazyNested> ds = 
CollectionDataSets.getCrazyNestedDataSet(env);
+               DataSet<Tuple2<String, Integer>> reduceDs = 
ds.groupBy("nest_Lvl1.nest_Lvl2.nest_Lvl3.nest_Lvl4.f1nal")
+                               .reduceGroup(new GroupReducer1());
 
-                                               DataSet<Tuple5<Integer, Long, 
Integer, String, Long>> ds = CollectionDataSets.get5TupleDataSet(env);
-                                               DataSet<Tuple5<Integer, Long, 
Integer, String, Long>> reduceDs = ds.
-                                                               groupBy(
-                                                                               
new KeySelector<Tuple5<Integer,Long,Integer,String,Long>, Tuple2<Integer, 
Long>>() {
-                                                                               
        private static final long serialVersionUID = 1L;
-                               
-                                                                               
        @Override
-                                                                               
        public Tuple2<Integer, Long> 
getKey(Tuple5<Integer,Long,Integer,String,Long> t) {
-                                                                               
                return new Tuple2<Integer, Long>(t.f0, t.f4);
-                                                                               
        }
-                                                                               
}).reduceGroup(new Tuple5GroupReduce());
-
-                                               reduceDs.writeAsCsv(resultPath);
-                                               env.execute();
-
-                                               // return expected result
-                                               return "1,1,0,P-),1\n" +
-                                                               "2,3,0,P-),1\n" 
+
-                                                               "2,2,0,P-),2\n" 
+
-                                                               "3,9,0,P-),2\n" 
+
-                                                               "3,6,0,P-),3\n" 
+
-                                                               
"4,17,0,P-),1\n" +
-                                                               
"4,17,0,P-),2\n" +
-                                                               
"5,11,0,P-),1\n" +
-                                                               
"5,29,0,P-),2\n" +
-                                                               
"5,25,0,P-),3\n";
-                               }
-                               case 15: {
-                                       /*
-                                        * check that input of combiner is also 
sorted for combinable groupReduce with group sorting
-                                        */
+               reduceDs.writeAsCsv(resultPath);
+               env.execute();
 
-                                       final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-                                       env.setDegreeOfParallelism(1);
+               expected = "aa,1\nbb,2\ncc,3\n";
+       }
 
-                                       DataSet<Tuple3<Integer, Long, String>> 
ds = CollectionDataSets.get3TupleDataSet(env);
-                                       DataSet<Tuple3<Integer, Long, String>> 
reduceDs = ds.
-                                                       groupBy(1).sortGroup(0, 
Order.ASCENDING).reduceGroup(new OrderCheckingCombinableReduce());
+       public static class GroupReducer1 implements 
GroupReduceFunction<CollectionDataSets.CrazyNested, Tuple2<String, Integer>> {
+               private static final long serialVersionUID = 1L;
 
-                                       reduceDs.writeAsCsv(resultPath);
-                                       env.execute();
+               @Override
+               public void reduce(Iterable<CrazyNested> values,
+                               Collector<Tuple2<String, Integer>> out)
+               throws Exception {
+                       int c = 0; String n = null;
+                       for(CrazyNested v : values) {
+                               c++; // haha
+                               n = 
v.nest_Lvl1.nest_Lvl2.nest_Lvl3.nest_Lvl4.f1nal;
+                       }
+                       out.collect(new Tuple2<String, Integer>(n,c));
+               }
+       }
 
-                                       // return expected result
-                                       return "1,1,Hi\n" +
-                                                       "2,2,Hello\n" +
-                                                       "4,3,Hello world, how 
are you?\n" +
-                                                       "7,4,Comment#1\n" +
-                                                       "11,5,Comment#5\n" +
-                                                       "16,6,Comment#10\n";
-                                       
-                               }
-                               case 16: {
-                                       /*
-                                        * Deep nesting test
-                                        * + null value in pojo
-                                        */
-                                       final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-                                       
-                                       DataSet<CrazyNested> ds = 
CollectionDataSets.getCrazyNestedDataSet(env);
-                                       DataSet<Tuple2<String, Integer>> 
reduceDs = ds.groupBy("nest_Lvl1.nest_Lvl2.nest_Lvl3.nest_Lvl4.f1nal")
-                                                       .reduceGroup(new 
GroupReduceFunction<CollectionDataSets.CrazyNested, Tuple2<String, Integer>>() {
-                                                               private static 
final long serialVersionUID = 1L;
+       @Test
+       public void testPojoExtendingFromTupleWithCustomFields() throws 
Exception {
+               /*
+                * Test Pojo extending from tuple WITH custom fields
+                */
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
 
-                                                               @Override
-                                                               public void 
reduce(Iterable<CrazyNested> values,
-                                                                               
Collector<Tuple2<String, Integer>> out)
-                                                                               
throws Exception {
-                                                                       int c = 
0; String n = null;
-                                                                       
for(CrazyNested v : values) {
-                                                                               
c++; // haha
-                                                                               
n = v.nest_Lvl1.nest_Lvl2.nest_Lvl3.nest_Lvl4.f1nal;
-                                                                       }
-                                                                       
out.collect(new Tuple2<String, Integer>(n,c));
-                                                               }});
-                                       
-                                       reduceDs.writeAsCsv(resultPath);
-                                       env.execute();
-                                       
-                                       // return expected result
-                                       return "aa,1\nbb,2\ncc,3\n";
-                               } 
-                               case 17: {
-                                       /*
-                                        * Test Pojo extending from tuple WITH 
custom fields
-                                        */
-                                       final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-                                       
-                                       DataSet<FromTupleWithCTor> ds = 
CollectionDataSets.getPojoExtendingFromTuple(env);
-                                       DataSet<Integer> reduceDs = 
ds.groupBy("special", "f2")
-                                                       .reduceGroup(new 
GroupReduceFunction<FromTupleWithCTor, Integer>() {
-                                                               private static 
final long serialVersionUID = 1L;
-                                                               @Override
-                                                               public void 
reduce(Iterable<FromTupleWithCTor> values,
-                                                                               
Collector<Integer> out)
-                                                                               
throws Exception {
-                                                                       int c = 
0;
-                                                                       
for(FromTuple v : values) {
-                                                                               
c++;
-                                                                       }
-                                                                       
out.collect(c);
-                                                               }});
-                                       
-                                       reduceDs.writeAsText(resultPath);
-                                       env.execute();
-                                       
-                                       // return expected result
-                                       return "3\n2\n";
-                               } 
-                               case 18: {
-                                       /*
-                                        * Test Pojo containing a Writable and 
Tuples
-                                        */
-                                       final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-                                       
-                                       DataSet<PojoContainingTupleAndWritable> 
ds = CollectionDataSets.getPojoContainingTupleAndWritable(env);
-                                       DataSet<Integer> reduceDs = 
ds.groupBy("hadoopFan", "theTuple.*") // full tuple selection
-                                                       .reduceGroup(new 
GroupReduceFunction<PojoContainingTupleAndWritable, Integer>() {
-                                                               private static 
final long serialVersionUID = 1L;
+               DataSet<FromTupleWithCTor> ds = 
CollectionDataSets.getPojoExtendingFromTuple(env);
+               DataSet<Integer> reduceDs = ds.groupBy("special", "f2")
+                               .reduceGroup(new GroupReducer2());
+
+               reduceDs.writeAsText(resultPath);
+               env.execute();
+
+               expected = "3\n2\n";
+       }
+
+       public static class GroupReducer2 implements 
GroupReduceFunction<FromTupleWithCTor, Integer> {
+               private static final long serialVersionUID = 1L;
+               @Override
+               public void reduce(Iterable<FromTupleWithCTor> values,
+                               Collector<Integer> out)
+               throws Exception {
+                       int c = 0;
+                       for(FromTuple v : values) {
+                               c++;
+                       }
+                       out.collect(c);
+               }
+       }
+
+       @Test
+       public void testPojoContainigWritableAndTuples() throws Exception {
+               /*
+                * Test Pojo containing a Writable and Tuples
+                */
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               DataSet<PojoContainingTupleAndWritable> ds = 
CollectionDataSets.getPojoContainingTupleAndWritable(env);
+               DataSet<Integer> reduceDs = ds.groupBy("hadoopFan", 
"theTuple.*") // full tuple selection
+                               .reduceGroup(new GroupReducer3());
+
+               reduceDs.writeAsText(resultPath);
+               env.execute();
+
+               expected = "1\n5\n";
+       }
+
+       public static class GroupReducer3 implements 
GroupReduceFunction<PojoContainingTupleAndWritable, Integer> {
+               private static final long serialVersionUID = 1L;
                                                                
-                                                               @Override
-                                                               public void 
reduce(Iterable<PojoContainingTupleAndWritable> values,
-                                                                               
Collector<Integer> out)
-                                                                               
throws Exception {
-                                                                       int c = 
0;
-                                                                       
for(PojoContainingTupleAndWritable v : values) {
-                                                                               
c++;
-                                                                       }
-                                                                       
out.collect(c);
-                                                               }});
-                                       
-                                       reduceDs.writeAsText(resultPath);
-                                       env.execute();
-                                       
-                                       // return expected result
-                                       return "1\n5\n";
-                               } 
-                               case 19: {
-                                       /*
-                                        * Test Tuple containing pojos and 
regular fields
-                                        */
-                                       final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-                                       
-                                       DataSet<Tuple3<Integer,CrazyNested, 
POJO>> ds = CollectionDataSets.getTupleContainingPojos(env);
-                                       DataSet<Integer> reduceDs = 
ds.groupBy("f0", "f1.*") // nested full tuple selection
-                                                       .reduceGroup(new 
GroupReduceFunction<Tuple3<Integer,CrazyNested, POJO>, Integer>() {
-                                                               private static 
final long serialVersionUID = 1L;
-                                                               @Override
-                                                               public void 
reduce(Iterable<Tuple3<Integer,CrazyNested, POJO>> values,
-                                                                               
Collector<Integer> out)
-                                                                               
throws Exception {
-                                                                       int c = 
0;
-                                                                       
for(Tuple3<Integer,CrazyNested, POJO> v : values) {
-                                                                               
c++;
-                                                                       }
-                                                                       
out.collect(c);
-                                                               }});
-                                       
-                                       reduceDs.writeAsText(resultPath);
-                                       env.execute();
-                                       
-                                       // return expected result
-                                       return "3\n1\n";
-                               }
-                               case 20: {
-                                       /*
-                                        * Test string-based definition on 
group sort, based on test:
-                                        * check correctness of groupReduce 
with descending group sort
-                                        */
-                                       final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-                                       env.setDegreeOfParallelism(1);
+               @Override
+               public void reduce(Iterable<PojoContainingTupleAndWritable> 
values,
+                               Collector<Integer> out)
+               throws Exception {
+                       int c = 0;
+                       for(PojoContainingTupleAndWritable v : values) {
+                               c++;
+                       }
+                       out.collect(c);
+               }
+       }
 
-                                       DataSet<Tuple3<Integer, Long, String>> 
ds = CollectionDataSets.get3TupleDataSet(env);
-                                       DataSet<Tuple3<Integer, Long, String>> 
reduceDs = ds.
-                                                       
groupBy(1).sortGroup("f2", Order.DESCENDING).reduceGroup(new 
Tuple3SortedGroupReduce());
+       @Test
+       public void testTupleContainingPojosAndRegularFields() throws Exception 
{
+               /*
+                * Test Tuple containing pojos and regular fields
+                */
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
 
-                                       reduceDs.writeAsCsv(resultPath);
-                                       env.execute();
+               DataSet<Tuple3<Integer,CrazyNested, POJO>> ds = 
CollectionDataSets.getTupleContainingPojos(env);
+               DataSet<Integer> reduceDs = ds.groupBy("f0", "f1.*") // nested 
full tuple selection
+                               .reduceGroup(new GroupReducer4());
 
-                                       // return expected result
-                                       return "1,1,Hi\n" +
-                                                       "5,2,Hello 
world-Hello\n" +
-                                                       "15,3,Luke Skywalker-I 
am fine.-Hello world, how are you?\n" +
-                                                       
"34,4,Comment#4-Comment#3-Comment#2-Comment#1\n" +
-                                                       
"65,5,Comment#9-Comment#8-Comment#7-Comment#6-Comment#5\n" +
-                                                       
"111,6,Comment#15-Comment#14-Comment#13-Comment#12-Comment#11-Comment#10\n";
+               reduceDs.writeAsText(resultPath);
+               env.execute();
 
-                               }
-                               case 21: {
-                                       /*
-                                        * Test int-based definition on group 
sort, for (full) nested Tuple
-                                        */
-                                       final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-                                       env.setDegreeOfParallelism(1);
+               expected = "3\n1\n";
+       }
 
-                                       DataSet<Tuple2<Tuple2<Integer, 
Integer>, String>> ds = 
CollectionDataSets.getGroupSortedNestedTupleDataSet(env);
-                                       DataSet<String> reduceDs = 
ds.groupBy("f1").sortGroup(0, Order.DESCENDING).reduceGroup(new 
NestedTupleReducer());
-                                       reduceDs.writeAsText(resultPath);
-                                       env.execute();
+       public static class GroupReducer4 implements 
GroupReduceFunction<Tuple3<Integer,CrazyNested, POJO>, Integer> {
+               private static final long serialVersionUID = 1L;
+               @Override
+               public void reduce(Iterable<Tuple3<Integer,CrazyNested, POJO>> 
values,
+                               Collector<Integer> out)
+               throws Exception {
+                       int c = 0;
+                       for(Tuple3<Integer,CrazyNested, POJO> v : values) {
+                               c++;
+                       }
+                       out.collect(c);
+               }
+       }
 
-                                       // return expected result
-                                       return "a--(2,1)-(1,3)-(1,2)-\n" +
-                                                       "b--(2,2)-\n"+
-                                                       
"c--(4,9)-(3,6)-(3,3)-\n";
-                               }
-                               case 22: {
-                                       /*
-                                        * Test int-based definition on group 
sort, for (partial) nested Tuple ASC
-                                        */
-                                       final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-                                       env.setDegreeOfParallelism(1);
+       @Test
+       public void testStringBasedDefinitionOnGroupSort() throws Exception {
+               /*
+                * Test string-based definition on group sort, based on test:
+                * check correctness of groupReduce with descending group sort
+                */
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               env.setDegreeOfParallelism(1);
+
+               DataSet<Tuple3<Integer, Long, String>> ds = 
CollectionDataSets.get3TupleDataSet(env);
+               DataSet<Tuple3<Integer, Long, String>> reduceDs = ds.
+                               groupBy(1).sortGroup("f2", 
Order.DESCENDING).reduceGroup(new Tuple3SortedGroupReduce());
+
+               reduceDs.writeAsCsv(resultPath);
+               env.execute();
+
+               expected = "1,1,Hi\n" +
+                               "5,2,Hello world-Hello\n" +
+                               "15,3,Luke Skywalker-I am fine.-Hello world, 
how are you?\n" +
+                               
"34,4,Comment#4-Comment#3-Comment#2-Comment#1\n" +
+                               
"65,5,Comment#9-Comment#8-Comment#7-Comment#6-Comment#5\n" +
+                               
"111,6,Comment#15-Comment#14-Comment#13-Comment#12-Comment#11-Comment#10\n";
+       }
 
-                                       DataSet<Tuple2<Tuple2<Integer, 
Integer>, String>> ds = 
CollectionDataSets.getGroupSortedNestedTupleDataSet(env);
-                                       // f0.f0 is first integer
-                                       DataSet<String> reduceDs = 
ds.groupBy("f1")
-                                                       .sortGroup("f0.f0", 
Order.ASCENDING)
-                                                       .sortGroup("f0.f1", 
Order.ASCENDING)
-                                                       .reduceGroup(new 
NestedTupleReducer());
-                                       reduceDs.writeAsText(resultPath);
-                                       env.execute();
-                                       
-                                       // return expected result
-                                       return "a--(1,2)-(1,3)-(2,1)-\n" +
-                                                       "b--(2,2)-\n"+
-                                                       
"c--(3,3)-(3,6)-(4,9)-\n";
-                               }
-                               case 23: {
-                                       /*
-                                        * Test string-based definition on 
group sort, for (partial) nested Tuple DESC
-                                        */
-                                       final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-                                       env.setDegreeOfParallelism(1);
+       @Test
+       public void testIntBasedDefinitionOnGroupSortForFullNestedTuple() 
throws Exception {
+               /*
+                * Test int-based definition on group sort, for (full) nested 
Tuple
+                */
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               env.setDegreeOfParallelism(1);
+
+               DataSet<Tuple2<Tuple2<Integer, Integer>, String>> ds = 
CollectionDataSets.getGroupSortedNestedTupleDataSet(env);
+               DataSet<String> reduceDs = ds.groupBy("f1").sortGroup(0, 
Order.DESCENDING).reduceGroup(new NestedTupleReducer());
+               reduceDs.writeAsText(resultPath);
+               env.execute();
+
+               expected = "a--(2,1)-(1,3)-(1,2)-\n" +
+                               "b--(2,2)-\n"+
+                               "c--(4,9)-(3,6)-(3,3)-\n";
+       }
 
-                                       DataSet<Tuple2<Tuple2<Integer, 
Integer>, String>> ds = 
CollectionDataSets.getGroupSortedNestedTupleDataSet(env);
-                                       // f0.f0 is first integer
-                                       DataSet<String> reduceDs = 
ds.groupBy("f1").sortGroup("f0.f0", Order.DESCENDING).reduceGroup(new 
NestedTupleReducer());
-                                       reduceDs.writeAsText(resultPath);
-                                       env.execute();
-                                       
-                                       // return expected result
-                                       return "a--(2,1)-(1,3)-(1,2)-\n" +
-                                                       "b--(2,2)-\n"+
-                                                       
"c--(4,9)-(3,3)-(3,6)-\n";
-                               }
-                               case 24: {
-                                       /*
-                                        * Test string-based definition on 
group sort, for two grouping keys
-                                        */
-                                       final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-                                       env.setDegreeOfParallelism(1);
+       @Test
+       public void testIntBasedDefinitionOnGroupSortForPartialNestedTuple() 
throws Exception {
+               /*
+                * Test int-based definition on group sort, for (partial) 
nested Tuple ASC
+                */
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               env.setDegreeOfParallelism(1);
+
+               DataSet<Tuple2<Tuple2<Integer, Integer>, String>> ds = 
CollectionDataSets.getGroupSortedNestedTupleDataSet(env);
+               // f0.f0 is first integer
+               DataSet<String> reduceDs = ds.groupBy("f1")
+                               .sortGroup("f0.f0", Order.ASCENDING)
+                               .sortGroup("f0.f1", Order.ASCENDING)
+                               .reduceGroup(new NestedTupleReducer());
+               reduceDs.writeAsText(resultPath);
+               env.execute();
+
+               expected = "a--(1,2)-(1,3)-(2,1)-\n" +
+                               "b--(2,2)-\n"+
+                               "c--(3,3)-(3,6)-(4,9)-\n";
+       }
 
-                                       DataSet<Tuple2<Tuple2<Integer, 
Integer>, String>> ds = 
CollectionDataSets.getGroupSortedNestedTupleDataSet(env);
-                                       // f0.f0 is first integer
-                                       DataSet<String> reduceDs = 
ds.groupBy("f1").sortGroup("f0.f0", Order.DESCENDING).sortGroup("f0.f1", 
Order.DESCENDING).reduceGroup(new NestedTupleReducer());
-                                       reduceDs.writeAsText(resultPath);
-                                       env.execute();
-                                       
-                                       // return expected result
-                                       return "a--(2,1)-(1,3)-(1,2)-\n" +
-                                                       "b--(2,2)-\n"+
-                                                       
"c--(4,9)-(3,6)-(3,3)-\n";
-                               }
-                               case 25: {
-                                       /*
-                                        * Test string-based definition on 
group sort, for two grouping keys with Pojos
-                                        */
-                                       final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-                                       env.setDegreeOfParallelism(1);
+       @Test
+       public void testStringBasedDefinitionOnGroupSortForPartialNestedTuple() 
throws Exception {
+               /*
+                * Test string-based definition on group sort, for (partial) 
nested Tuple DESC
+                */
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               env.setDegreeOfParallelism(1);
+
+               DataSet<Tuple2<Tuple2<Integer, Integer>, String>> ds = 
CollectionDataSets.getGroupSortedNestedTupleDataSet(env);
+               // f0.f0 is first integer
+               DataSet<String> reduceDs = ds.groupBy("f1").sortGroup("f0.f0", 
Order.DESCENDING).reduceGroup(new NestedTupleReducer());
+               reduceDs.writeAsText(resultPath);
+               env.execute();
+
+               expected = "a--(2,1)-(1,3)-(1,2)-\n" +
+                               "b--(2,2)-\n"+
+                               "c--(4,9)-(3,3)-(3,6)-\n";
+       }
 
-                                       DataSet<PojoContainingTupleAndWritable> 
ds = CollectionDataSets.getGroupSortedPojoContainingTupleAndWritable(env);
-                                       // f0.f0 is first integer
-                                       DataSet<String> reduceDs = 
ds.groupBy("hadoopFan").sortGroup("theTuple.f0", 
Order.DESCENDING).sortGroup("theTuple.f1", Order.DESCENDING)
-                                                       .reduceGroup(new 
GroupReduceFunction<CollectionDataSets.PojoContainingTupleAndWritable, 
String>() {
-                                                               @Override
-                                                               public void 
reduce(
-                                                                               
Iterable<PojoContainingTupleAndWritable> values,
-                                                                               
Collector<String> out) throws Exception {
-                                                                       boolean 
once = false;
-                                                                       
StringBuilder concat = new StringBuilder();
-                                                                       
for(PojoContainingTupleAndWritable value : values) {
-                                                                               
if(!once) {
-                                                                               
        concat.append(value.hadoopFan.get());
-                                                                               
        concat.append("---");
-                                                                               
        once = true;
-                                                                               
}
-                                                                               
concat.append(value.theTuple);
-                                                                               
concat.append("-");
-                                                                       }
-                                                                       
out.collect(concat.toString());
-                                                               }
-                                       });
-                                       reduceDs.writeAsText(resultPath);
-                                       env.execute();
-                                       
-                                       // return expected result
-                                       return "1---(10,100)-\n" +
-                                                       
"2---(30,600)-(30,400)-(30,200)-(20,201)-(20,200)-\n";
-                               }
-                               case 26: {
-                                       /*
-                                        * Test grouping with pojo containing 
multiple pojos (was a bug)
-                                        */
-                                       final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-                                       env.setDegreeOfParallelism(1);
+       @Test
+       public void testStringBasedDefinitionOnGroupSortForTwoGroupingKeys() 
throws Exception {
+               /*
+                * Test string-based definition on group sort, for two grouping 
keys
+                */
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               env.setDegreeOfParallelism(1);
+
+               DataSet<Tuple2<Tuple2<Integer, Integer>, String>> ds = 
CollectionDataSets.getGroupSortedNestedTupleDataSet(env);
+               // f0.f0 is first integer
+               DataSet<String> reduceDs = ds.groupBy("f1").sortGroup("f0.f0", 
Order.DESCENDING).sortGroup("f0.f1", Order.DESCENDING).reduceGroup(new 
NestedTupleReducer());
+               reduceDs.writeAsText(resultPath);
+               env.execute();
+
+               expected = "a--(2,1)-(1,3)-(1,2)-\n" +
+                               "b--(2,2)-\n"+
+                               "c--(4,9)-(3,6)-(3,3)-\n";
+       }
 
-                                       
DataSet<CollectionDataSets.PojoWithMultiplePojos> ds = 
CollectionDataSets.getPojoWithMultiplePojos(env);
-                                       // f0.f0 is first integer
-                                       DataSet<String> reduceDs = 
ds.groupBy("p2.a2")
-                                                       .reduceGroup(new 
GroupReduceFunction<CollectionDataSets.PojoWithMultiplePojos, String>() {
-                                                               @Override
-                                                               public void 
reduce(
-                                                                               
Iterable<CollectionDataSets.PojoWithMultiplePojos> values,
-                                                                               
Collector<String> out) throws Exception {
-                                                                       
StringBuilder concat = new StringBuilder();
-                                                                       
for(CollectionDataSets.PojoWithMultiplePojos value : values) {
-                                                                               
concat.append(value.p2.a2);
-                                                                       }
-                                                                       
out.collect(concat.toString());
-                                                               }
-                                                       });
-                                       reduceDs.writeAsText(resultPath);
-                                       env.execute();
+       @Test
+       public void 
testStringBasedDefinitionOnGroupSortForTwoGroupingKeysWithPojos() throws 
Exception {
+               /*
+                * Test string-based definition on group sort, for two grouping 
keys with Pojos
+                */
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               env.setDegreeOfParallelism(1);
+
+               DataSet<PojoContainingTupleAndWritable> ds = 
CollectionDataSets.getGroupSortedPojoContainingTupleAndWritable(env);
+               // f0.f0 is first integer
+               DataSet<String> reduceDs = 
ds.groupBy("hadoopFan").sortGroup("theTuple.f0", 
Order.DESCENDING).sortGroup("theTuple.f1", Order.DESCENDING)
+                               .reduceGroup(new GroupReducer5());
+               reduceDs.writeAsText(resultPath);
+               env.execute();
+
+               expected = "1---(10,100)-\n" +
+                               
"2---(30,600)-(30,400)-(30,200)-(20,201)-(20,200)-\n";
+       }
 
-                                       // return expected result
-                                       return "b\nccc\nee\n";
+       public static class GroupReducer5 implements 
GroupReduceFunction<CollectionDataSets.PojoContainingTupleAndWritable, String> {
+               @Override
+               public void reduce(
+                               Iterable<PojoContainingTupleAndWritable> values,
+                               Collector<String> out) throws Exception {
+                       boolean once = false;
+                       StringBuilder concat = new StringBuilder();
+                       for(PojoContainingTupleAndWritable value : values) {
+                               if(!once) {
+                                       concat.append(value.hadoopFan.get());
+                                       concat.append("---");
+                                       once = true;
                                }
+                               concat.append(value.theTuple);
+                               concat.append("-");
+                       }
+                       out.collect(concat.toString());
+               }
+       }
 
+       @Test
+       public void testGroupingWithPojoContainingMultiplePojos() throws 
Exception {
+               /*
+                * Test grouping with pojo containing multiple pojos (was a bug)
+                */
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               env.setDegreeOfParallelism(1);
+
+               DataSet<CollectionDataSets.PojoWithMultiplePojos> ds = 
CollectionDataSets.getPojoWithMultiplePojos(env);
+               // f0.f0 is first integer
+               DataSet<String> reduceDs = ds.groupBy("p2.a2")
+                               .reduceGroup(new GroupReducer6());
+               reduceDs.writeAsText(resultPath);
+               env.execute();
+
+               expected = "b\nccc\nee\n";
+       }
+
+       public static class GroupReducer6 implements 
GroupReduceFunction<CollectionDataSets.PojoWithMultiplePojos, String> {
+               @Override
+               public void reduce(
+                               
Iterable<CollectionDataSets.PojoWithMultiplePojos> values,
+                               Collector<String> out) throws Exception {
+                       StringBuilder concat = new StringBuilder();
+                       for(CollectionDataSets.PojoWithMultiplePojos value : 
values) {
+                               concat.append(value.p2.a2);
+                       }
+                       out.collect(concat.toString());
+               }
                                case 27: {
                                        /*
                                         * Test Java collections within pojos ( 
== test kryo)
@@ -831,12 +838,6 @@ public class GroupReduceITCase extends JavaProgramTestBase 
{
                                                        
"PojoWithCollection{pojos.size()=2, key=0, sqlDate=1976-05-03, 
bigInt=92233720368547758070, bigDecimalKeepItNull=null, scalaBigInt=31104000, 
mixed=null}\n";
                                }
 
-                               default: {
-                                       throw new 
IllegalArgumentException("Invalid program id");
-                               }
-                       }
-               }
-       
        }
        
        

Reply via email to