http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/operators/ReduceITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/operators/ReduceITCase.java 
b/flink-tests/src/test/java/org/apache/flink/test/operators/ReduceITCase.java
new file mode 100644
index 0000000..2d6897b
--- /dev/null
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/operators/ReduceITCase.java
@@ -0,0 +1,515 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.operators;
+
+import org.apache.flink.api.common.functions.GroupReduceFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.functions.RichReduceFunction;
+import 
org.apache.flink.api.common.operators.base.ReduceOperatorBase.CombineHint;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.tuple.Tuple5;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.test.operators.util.CollectionDataSets;
+import org.apache.flink.test.operators.util.CollectionDataSets.CustomType;
+import 
org.apache.flink.test.operators.util.CollectionDataSets.PojoWithDateAndEnum;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.apache.flink.util.Collector;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.Collection;
+import java.util.Date;
+import java.util.List;
+
+/**
+ * Integration tests for {@link ReduceFunction} and {@link RichReduceFunction}.
+ */
+@SuppressWarnings("serial")
+@RunWith(Parameterized.class)
+public class ReduceITCase extends MultipleProgramsTestBase {
+
+       public ReduceITCase(TestExecutionMode mode){
+               super(mode);
+       }
+
+       @Test
+       public void testReduceOnTuplesWithKeyFieldSelector() throws Exception {
+               /*
+                * Reduce on tuples with key field selector
+                */
+
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               DataSet<Tuple3<Integer, Long, String>> ds = 
CollectionDataSets.get3TupleDataSet(env);
+               DataSet<Tuple3<Integer, Long, String>> reduceDs = ds.
+                               groupBy(1).reduce(new Tuple3Reduce("B-)"));
+
+               List<Tuple3<Integer, Long, String>> result = reduceDs.collect();
+
+               String expected = "1,1,Hi\n" +
+                               "5,2,B-)\n" +
+                               "15,3,B-)\n" +
+                               "34,4,B-)\n" +
+                               "65,5,B-)\n" +
+                               "111,6,B-)\n";
+
+               compareResultAsTuples(result, expected);
+       }
+
+       @Test
+       public void testReduceOnTupleWithMultipleKeyFieldSelectors() throws 
Exception{
+               /*
+                * Reduce on tuples with multiple key field selectors
+                */
+
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds = 
CollectionDataSets.get5TupleDataSet(env);
+               DataSet<Tuple5<Integer, Long, Integer, String, Long>> reduceDs 
= ds.
+                               groupBy(4, 0).reduce(new Tuple5Reduce());
+
+               List<Tuple5<Integer, Long, Integer, String, Long>> result = 
reduceDs
+                               .collect();
+
+               String expected = "1,1,0,Hallo,1\n" +
+                               "2,3,2,Hallo Welt wie,1\n" +
+                               "2,2,1,Hallo Welt,2\n" +
+                               "3,9,0,P-),2\n" +
+                               "3,6,5,BCD,3\n" +
+                               "4,17,0,P-),1\n" +
+                               "4,17,0,P-),2\n" +
+                               "5,11,10,GHI,1\n" +
+                               "5,29,0,P-),2\n" +
+                               "5,25,0,P-),3\n";
+
+               compareResultAsTuples(result, expected);
+       }
+
+       @Test
+       public void testReduceOnTuplesWithKeyExtractor() throws Exception {
+               /*
+                * Reduce on tuples with key extractor
+                */
+
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               DataSet<Tuple3<Integer, Long, String>> ds = 
CollectionDataSets.get3TupleDataSet(env);
+               DataSet<Tuple3<Integer, Long, String>> reduceDs = ds.
+                               groupBy(new KeySelector1()).reduce(new 
Tuple3Reduce("B-)"));
+
+               List<Tuple3<Integer, Long, String>> result = reduceDs.collect();
+
+               String expected = "1,1,Hi\n" +
+                               "5,2,B-)\n" +
+                               "15,3,B-)\n" +
+                               "34,4,B-)\n" +
+                               "65,5,B-)\n" +
+                               "111,6,B-)\n";
+
+               compareResultAsTuples(result, expected);
+       }
+
+       private static class KeySelector1 implements 
KeySelector<Tuple3<Integer, Long, String>, Long> {
+               private static final long serialVersionUID = 1L;
+               @Override
+               public Long getKey(Tuple3<Integer, Long, String> in) {
+                       return in.f1;
+               }
+       }
+
+       @Test
+       public void testReduceOnCustomTypeWithKeyExtractor() throws Exception {
+               /*
+                * Reduce on custom type with key extractor
+                */
+
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               DataSet<CustomType> ds = 
CollectionDataSets.getCustomTypeDataSet(env);
+               DataSet<CustomType> reduceDs = ds.
+                               groupBy(new KeySelector2()).reduce(new 
CustomTypeReduce());
+
+               List<CustomType> result = reduceDs.collect();
+
+               String expected = "1,0,Hi\n" +
+                               "2,3,Hello!\n" +
+                               "3,12,Hello!\n" +
+                               "4,30,Hello!\n" +
+                               "5,60,Hello!\n" +
+                               "6,105,Hello!\n";
+
+               compareResultAsText(result, expected);
+       }
+
+       private static class KeySelector2 implements KeySelector<CustomType, 
Integer> {
+               private static final long serialVersionUID = 1L;
+               @Override
+               public Integer getKey(CustomType in) {
+                       return in.myInt;
+               }
+       }
+
+       @Test
+       public void testAllReduceForTuple() throws Exception {
+               /*
+                * All-reduce for tuple
+                */
+
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               DataSet<Tuple3<Integer, Long, String>> ds = 
CollectionDataSets.get3TupleDataSet(env);
+               DataSet<Tuple3<Integer, Long, String>> reduceDs = ds.
+                               reduce(new AllAddingTuple3Reduce());
+
+               List<Tuple3<Integer, Long, String>> result = reduceDs.collect();
+
+               String expected = "231,91,Hello World\n";
+
+               compareResultAsTuples(result, expected);
+       }
+
+       @Test
+       public void testAllReduceForCustomTypes() throws Exception {
+               /*
+                * All-reduce for custom types
+                */
+
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               DataSet<CustomType> ds = 
CollectionDataSets.getCustomTypeDataSet(env);
+               DataSet<CustomType> reduceDs = ds.
+                               reduce(new AllAddingCustomTypeReduce());
+
+               List<CustomType> result = reduceDs.collect();
+
+               String expected = "91,210,Hello!";
+
+               compareResultAsText(result, expected);
+       }
+
+       @Test
+       public void testReduceWithBroadcastSet() throws Exception {
+               /*
+                * Reduce with broadcast set
+                */
+
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               DataSet<Integer> intDs = 
CollectionDataSets.getIntegerDataSet(env);
+
+               DataSet<Tuple3<Integer, Long, String>> ds = 
CollectionDataSets.get3TupleDataSet(env);
+               DataSet<Tuple3<Integer, Long, String>> reduceDs = ds.
+                               groupBy(1).reduce(new 
BCTuple3Reduce()).withBroadcastSet(intDs, "ints");
+
+               List<Tuple3<Integer, Long, String>> result = reduceDs.collect();
+
+               String expected = "1,1,Hi\n" +
+                               "5,2,55\n" +
+                               "15,3,55\n" +
+                               "34,4,55\n" +
+                               "65,5,55\n" +
+                               "111,6,55\n";
+
+               compareResultAsTuples(result, expected);
+       }
+
+       @Test
+       public void testReduceATupleReturningKeySelector() throws Exception {
+               /*
+                * Reduce with a Tuple-returning KeySelector
+                */
+
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               DataSet<Tuple5<Integer, Long,  Integer, String, Long>> ds = 
CollectionDataSets.get5TupleDataSet(env);
+               DataSet<Tuple5<Integer, Long,  Integer, String, Long>> reduceDs 
= ds
+                               .groupBy(new KeySelector3()).reduce(new 
Tuple5Reduce());
+
+               List<Tuple5<Integer, Long, Integer, String, Long>> result = 
reduceDs
+                               .collect();
+
+               String expected = "1,1,0,Hallo,1\n" +
+                               "2,3,2,Hallo Welt wie,1\n" +
+                               "2,2,1,Hallo Welt,2\n" +
+                               "3,9,0,P-),2\n" +
+                               "3,6,5,BCD,3\n" +
+                               "4,17,0,P-),1\n" +
+                               "4,17,0,P-),2\n" +
+                               "5,11,10,GHI,1\n" +
+                               "5,29,0,P-),2\n" +
+                               "5,25,0,P-),3\n";
+
+               compareResultAsTuples(result, expected);
+       }
+
+       private static class KeySelector3 implements 
KeySelector<Tuple5<Integer, Long, Integer, String, Long>, Tuple2<Integer, 
Long>> {
+               private static final long serialVersionUID = 1L;
+
+               @Override
+               public Tuple2<Integer, Long> getKey(Tuple5<Integer, Long, 
Integer, String, Long> t) {
+                       return new Tuple2<Integer, Long>(t.f0, t.f4);
+               }
+       }
+
+       @Test
+       public void testReduceOnTupleWithMultipleKeyExpressions() throws 
Exception {
+               /*
+                * Case 2 with String-based field expression
+                */
+
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds = 
CollectionDataSets.get5TupleDataSet(env);
+               DataSet<Tuple5<Integer, Long, Integer, String, Long>> reduceDs 
= ds
+                               .groupBy("f4", "f0").reduce(new Tuple5Reduce());
+
+               List<Tuple5<Integer, Long, Integer, String, Long>> result = 
reduceDs
+                               .collect();
+
+               String expected = "1,1,0,Hallo,1\n" +
+                               "2,3,2,Hallo Welt wie,1\n" +
+                               "2,2,1,Hallo Welt,2\n" +
+                               "3,9,0,P-),2\n" +
+                               "3,6,5,BCD,3\n" +
+                               "4,17,0,P-),1\n" +
+                               "4,17,0,P-),2\n" +
+                               "5,11,10,GHI,1\n" +
+                               "5,29,0,P-),2\n" +
+                               "5,25,0,P-),3\n";
+
+               compareResultAsTuples(result, expected);
+       }
+
+       @Test
+       public void testReduceOnTupleWithMultipleKeyExpressionsWithHashHint() 
throws Exception {
+               /*
+                * Case 2 with String-based field expression
+                */
+
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds = 
CollectionDataSets.get5TupleDataSet(env);
+               DataSet<Tuple5<Integer, Long, Integer, String, Long>> reduceDs 
= ds
+                       .groupBy("f4", "f0").reduce(new 
Tuple5Reduce()).setCombineHint(CombineHint.HASH);
+
+               List<Tuple5<Integer, Long, Integer, String, Long>> result = 
reduceDs
+                       .collect();
+
+               String expected = "1,1,0,Hallo,1\n" +
+                       "2,3,2,Hallo Welt wie,1\n" +
+                       "2,2,1,Hallo Welt,2\n" +
+                       "3,9,0,P-),2\n" +
+                       "3,6,5,BCD,3\n" +
+                       "4,17,0,P-),1\n" +
+                       "4,17,0,P-),2\n" +
+                       "5,11,10,GHI,1\n" +
+                       "5,29,0,P-),2\n" +
+                       "5,25,0,P-),3\n";
+
+               compareResultAsTuples(result, expected);
+       }
+
+       @Test
+       public void testSupportForDataAndEnumSerialization() throws Exception {
+               /**
+                * Test support for Date and enum serialization
+                */
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               DataSet<PojoWithDateAndEnum> ds = env.generateSequence(0, 
2).map(new Mapper1());
+               ds = ds.union(CollectionDataSets.getPojoWithDateAndEnum(env));
+
+               DataSet<String> res = ds.groupBy("group").reduceGroup(new 
GroupReducer1());
+
+               List<String> result = res.collect();
+
+               String expected = "ok\nok";
+
+               compareResultAsText(result, expected);
+       }
+
+       private static class Mapper1 implements MapFunction<Long, 
PojoWithDateAndEnum> {
+               @Override
+               public PojoWithDateAndEnum map(Long value) throws Exception {
+                       int l = value.intValue();
+                       switch (l) {
+                               case 0:
+                                       PojoWithDateAndEnum one = new 
PojoWithDateAndEnum();
+                                       one.group = "a";
+                                       one.date = new Date(666);
+                                       one.cat = 
CollectionDataSets.Category.CAT_A;
+                                       return one;
+                               case 1:
+                                       PojoWithDateAndEnum two = new 
PojoWithDateAndEnum();
+                                       two.group = "a";
+                                       two.date = new Date(666);
+                                       two.cat = 
CollectionDataSets.Category.CAT_A;
+                                       return two;
+                               case 2:
+                                       PojoWithDateAndEnum three = new 
PojoWithDateAndEnum();
+                                       three.group = "b";
+                                       three.date = new Date(666);
+                                       three.cat = 
CollectionDataSets.Category.CAT_B;
+                                       return three;
+                       }
+                       throw new RuntimeException("Unexpected value for l=" + 
l);
+               }
+       }
+
+       private static class GroupReducer1 implements 
GroupReduceFunction<CollectionDataSets.PojoWithDateAndEnum, String> {
+               private static final long serialVersionUID = 1L;
+
+               @Override
+               public void reduce(Iterable<PojoWithDateAndEnum> values,
+                               Collector<String> out) throws Exception {
+                       for (PojoWithDateAndEnum val : values) {
+                               if (val.cat == 
CollectionDataSets.Category.CAT_A) {
+                                       Assert.assertEquals("a", val.group);
+                               } else if (val.cat == 
CollectionDataSets.Category.CAT_B) {
+                                       Assert.assertEquals("b", val.group);
+                               } else {
+                                       Assert.fail("error. Cat = " + val.cat);
+                               }
+                               Assert.assertEquals(666, val.date.getTime());
+                       }
+                       out.collect("ok");
+               }
+       }
+
+       private static class Tuple3Reduce implements 
ReduceFunction<Tuple3<Integer, Long, String>> {
+               private static final long serialVersionUID = 1L;
+               private final Tuple3<Integer, Long, String> out = new 
Tuple3<Integer, Long, String>();
+               private final String f2Replace;
+
+               public Tuple3Reduce() {
+                       this.f2Replace = null;
+               }
+
+               public Tuple3Reduce(String f2Replace) {
+                       this.f2Replace = f2Replace;
+               }
+
+               @Override
+               public Tuple3<Integer, Long, String> reduce(
+                               Tuple3<Integer, Long, String> in1,
+                               Tuple3<Integer, Long, String> in2) throws 
Exception {
+
+                       if (f2Replace == null) {
+                               out.setFields(in1.f0 + in2.f0, in1.f1, in1.f2);
+                       } else {
+                               out.setFields(in1.f0 + in2.f0, in1.f1, 
this.f2Replace);
+                       }
+                       return out;
+               }
+       }
+
+       private static class Tuple5Reduce implements 
ReduceFunction<Tuple5<Integer, Long, Integer, String, Long>> {
+               private static final long serialVersionUID = 1L;
+               private final Tuple5<Integer, Long, Integer, String, Long> out 
= new Tuple5<Integer, Long, Integer, String, Long>();
+
+               @Override
+               public Tuple5<Integer, Long, Integer, String, Long> reduce(
+                               Tuple5<Integer, Long, Integer, String, Long> 
in1,
+                               Tuple5<Integer, Long, Integer, String, Long> 
in2)
+                                               throws Exception {
+
+                       out.setFields(in1.f0, in1.f1 + in2.f1, 0, "P-)", 
in1.f4);
+                       return out;
+               }
+       }
+
+       private static class CustomTypeReduce implements 
ReduceFunction<CustomType> {
+               private static final long serialVersionUID = 1L;
+               private final CustomType out = new CustomType();
+
+               @Override
+               public CustomType reduce(CustomType in1, CustomType in2)
+                               throws Exception {
+
+                       out.myInt = in1.myInt;
+                       out.myLong = in1.myLong + in2.myLong;
+                       out.myString = "Hello!";
+                       return out;
+               }
+       }
+
+       private static class AllAddingTuple3Reduce implements 
ReduceFunction<Tuple3<Integer, Long, String>> {
+               private static final long serialVersionUID = 1L;
+               private final Tuple3<Integer, Long, String> out = new 
Tuple3<Integer, Long, String>();
+
+               @Override
+               public Tuple3<Integer, Long, String> reduce(
+                               Tuple3<Integer, Long, String> in1,
+                               Tuple3<Integer, Long, String> in2) throws 
Exception {
+
+                       out.setFields(in1.f0 + in2.f0, in1.f1 + in2.f1, "Hello 
World");
+                       return out;
+               }
+       }
+
+       private static class AllAddingCustomTypeReduce implements 
ReduceFunction<CustomType> {
+               private static final long serialVersionUID = 1L;
+               private final CustomType out = new CustomType();
+
+               @Override
+               public CustomType reduce(CustomType in1, CustomType in2)
+                               throws Exception {
+
+                       out.myInt = in1.myInt + in2.myInt;
+                       out.myLong = in1.myLong + in2.myLong;
+                       out.myString = "Hello!";
+                       return out;
+               }
+       }
+
+       private static class BCTuple3Reduce extends 
RichReduceFunction<Tuple3<Integer, Long, String>> {
+               private static final long serialVersionUID = 1L;
+               private final Tuple3<Integer, Long, String> out = new 
Tuple3<Integer, Long, String>();
+               private String f2Replace = "";
+
+               @Override
+               public void open(Configuration config) {
+
+                       Collection<Integer> ints = 
this.getRuntimeContext().getBroadcastVariable("ints");
+                       int sum = 0;
+                       for (Integer i : ints) {
+                               sum += i;
+                       }
+                       f2Replace = sum + "";
+
+               }
+
+               @Override
+               public Tuple3<Integer, Long, String> reduce(
+                               Tuple3<Integer, Long, String> in1,
+                               Tuple3<Integer, Long, String> in2) throws 
Exception {
+
+                       out.setFields(in1.f0 + in2.f0, in1.f1, this.f2Replace);
+                       return out;
+               }
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/operators/ReduceWithCombinerITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/operators/ReduceWithCombinerITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/operators/ReduceWithCombinerITCase.java
new file mode 100644
index 0000000..c6d340a
--- /dev/null
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/operators/ReduceWithCombinerITCase.java
@@ -0,0 +1,317 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.operators;
+
+import org.apache.flink.api.common.functions.CombineFunction;
+import org.apache.flink.api.common.functions.GroupCombineFunction;
+import org.apache.flink.api.common.functions.GroupReduceFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.operators.UnsortedGrouping;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.apache.flink.util.Collector;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * Integration tests for {@link GroupCombineFunction}.
+ */
+@SuppressWarnings("serial")
+@RunWith(Parameterized.class)
+public class ReduceWithCombinerITCase extends MultipleProgramsTestBase {
+
+       public ReduceWithCombinerITCase(TestExecutionMode mode) {
+               super(TestExecutionMode.CLUSTER);
+       }
+
+       @Test
+       public void testReduceOnNonKeyedDataset() throws Exception {
+
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               env.setParallelism(4);
+
+               // creates the input data and distributes them evenly among the 
available downstream tasks
+               DataSet<Tuple2<Integer, Boolean>> input = 
createNonKeyedInput(env);
+               List<Tuple2<Integer, Boolean>> actual = input.reduceGroup(new 
NonKeyedCombReducer()).collect();
+               String expected = "10,true\n";
+
+               compareResultAsTuples(actual, expected);
+       }
+
+       @Test
+       public void testForkingReduceOnNonKeyedDataset() throws Exception {
+
+               // set up the execution environment
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               env.setParallelism(4);
+
+               // creates the input data and distributes them evenly among the 
available downstream tasks
+               DataSet<Tuple2<Integer, Boolean>> input = 
createNonKeyedInput(env);
+
+               DataSet<Tuple2<Integer, Boolean>> r1 = input.reduceGroup(new 
NonKeyedCombReducer());
+               DataSet<Tuple2<Integer, Boolean>> r2 = input.reduceGroup(new 
NonKeyedGroupCombReducer());
+
+               List<Tuple2<Integer, Boolean>> actual = r1.union(r2).collect();
+               String expected = "10,true\n10,true\n";
+               compareResultAsTuples(actual, expected);
+       }
+
+       @Test
+       public void testReduceOnKeyedDataset() throws Exception {
+
+               // set up the execution environment
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               env.setParallelism(4);
+
+               // creates the input data and distributes them evenly among the 
available downstream tasks
+               DataSet<Tuple3<String, Integer, Boolean>> input = 
createKeyedInput(env);
+               List<Tuple3<String, Integer, Boolean>> actual = 
input.groupBy(0).reduceGroup(new KeyedCombReducer()).collect();
+               String expected = "k1,6,true\nk2,4,true\n";
+
+               compareResultAsTuples(actual, expected);
+       }
+
+       @Test
+       public void testReduceOnKeyedDatasetWithSelector() throws Exception {
+
+               // set up the execution environment
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               env.setParallelism(4);
+
+               // creates the input data and distributes them evenly among the 
available downstream tasks
+               DataSet<Tuple3<String, Integer, Boolean>> input = 
createKeyedInput(env);
+
+               List<Tuple3<String, Integer, Boolean>> actual = input
+                       .groupBy(new KeySelectorX())
+                       .reduceGroup(new KeyedCombReducer())
+                       .collect();
+               String expected = "k1,6,true\nk2,4,true\n";
+
+               compareResultAsTuples(actual, expected);
+       }
+
+       @Test
+       public void testForkingReduceOnKeyedDataset() throws Exception {
+
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               env.setParallelism(4);
+
+               // creates the input data and distributes them evenly among the 
available downstream tasks
+               DataSet<Tuple3<String, Integer, Boolean>> input = 
createKeyedInput(env);
+
+               UnsortedGrouping<Tuple3<String, Integer, Boolean>> counts = 
input.groupBy(0);
+
+               DataSet<Tuple3<String, Integer, Boolean>> r1 = 
counts.reduceGroup(new KeyedCombReducer());
+               DataSet<Tuple3<String, Integer, Boolean>> r2 = 
counts.reduceGroup(new KeyedGroupCombReducer());
+
+               List<Tuple3<String, Integer, Boolean>> actual = 
r1.union(r2).collect();
+               String expected = "k1,6,true\n" +
+                       "k2,4,true\n" +
+                       "k1,6,true\n" +
+                       "k2,4,true\n";
+               compareResultAsTuples(actual, expected);
+       }
+
+       @Test
+       public void testForkingReduceOnKeyedDatasetWithSelection() throws 
Exception {
+
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               env.setParallelism(4);
+
+               // creates the input data and distributes them evenly among the 
available downstream tasks
+               DataSet<Tuple3<String, Integer, Boolean>> input = 
createKeyedInput(env);
+
+               UnsortedGrouping<Tuple3<String, Integer, Boolean>> counts = 
input.groupBy(new KeySelectorX());
+
+               DataSet<Tuple3<String, Integer, Boolean>> r1 = 
counts.reduceGroup(new KeyedCombReducer());
+               DataSet<Tuple3<String, Integer, Boolean>> r2 = 
counts.reduceGroup(new KeyedGroupCombReducer());
+
+               List<Tuple3<String, Integer, Boolean>> actual = 
r1.union(r2).collect();
+               String expected = "k1,6,true\n" +
+                       "k2,4,true\n" +
+                       "k1,6,true\n" +
+                       "k2,4,true\n";
+
+               compareResultAsTuples(actual, expected);
+       }
+
+       private DataSet<Tuple2<Integer, Boolean>> 
createNonKeyedInput(ExecutionEnvironment env) {
+               return env.fromCollection(Arrays.asList(
+                       new Tuple2<>(1, false),
+                       new Tuple2<>(1, false),
+                       new Tuple2<>(1, false),
+                       new Tuple2<>(1, false),
+                       new Tuple2<>(1, false),
+                       new Tuple2<>(1, false),
+                       new Tuple2<>(1, false),
+                       new Tuple2<>(1, false),
+                       new Tuple2<>(1, false),
+                       new Tuple2<>(1, false))
+               ).rebalance();
+       }
+
+       private static class NonKeyedCombReducer implements 
CombineFunction<Tuple2<Integer, Boolean>, Tuple2<Integer, Boolean>>,
+               GroupReduceFunction<Tuple2<Integer, Boolean>, Tuple2<Integer, 
Boolean>> {
+
+               @Override
+               public Tuple2<Integer, Boolean> 
combine(Iterable<Tuple2<Integer, Boolean>> values) throws Exception {
+                       int sum = 0;
+                       boolean flag = true;
+
+                       for (Tuple2<Integer, Boolean> tuple : values) {
+                               sum += tuple.f0;
+                               flag &= !tuple.f1;
+
+                       }
+                       return new Tuple2<>(sum, flag);
+               }
+
+               @Override
+               public void reduce(Iterable<Tuple2<Integer, Boolean>> values, 
Collector<Tuple2<Integer, Boolean>> out) throws Exception {
+                       int sum = 0;
+                       boolean flag = true;
+                       for (Tuple2<Integer, Boolean> tuple : values) {
+                               sum += tuple.f0;
+                               flag &= tuple.f1;
+                       }
+                       out.collect(new Tuple2<>(sum, flag));
+               }
+       }
+
+       private static class NonKeyedGroupCombReducer implements 
GroupCombineFunction<Tuple2<Integer, Boolean>, Tuple2<Integer, Boolean>>,
+               GroupReduceFunction<Tuple2<Integer, Boolean>, Tuple2<Integer, 
Boolean>> {
+
+               @Override
+               public void reduce(Iterable<Tuple2<Integer, Boolean>> values, 
Collector<Tuple2<Integer, Boolean>> out) throws Exception {
+                       int sum = 0;
+                       boolean flag = true;
+                       for (Tuple2<Integer, Boolean> tuple : values) {
+                               sum += tuple.f0;
+                               flag &= tuple.f1;
+                       }
+                       out.collect(new Tuple2<>(sum, flag));
+               }
+
+               @Override
+               public void combine(Iterable<Tuple2<Integer, Boolean>> values, 
Collector<Tuple2<Integer, Boolean>> out) throws Exception {
+                       int sum = 0;
+                       boolean flag = true;
+                       for (Tuple2<Integer, Boolean> tuple : values) {
+                               sum += tuple.f0;
+                               flag &= !tuple.f1;
+                       }
+                       out.collect(new Tuple2<>(sum, flag));
+               }
+       }
+
+       private DataSet<Tuple3<String, Integer, Boolean>> 
createKeyedInput(ExecutionEnvironment env) {
+               return env.fromCollection(Arrays.asList(
+                       new Tuple3<>("k1", 1, false),
+                       new Tuple3<>("k1", 1, false),
+                       new Tuple3<>("k1", 1, false),
+                       new Tuple3<>("k2", 1, false),
+                       new Tuple3<>("k1", 1, false),
+                       new Tuple3<>("k1", 1, false),
+                       new Tuple3<>("k2", 1, false),
+                       new Tuple3<>("k2", 1, false),
+                       new Tuple3<>("k1", 1, false),
+                       new Tuple3<>("k2", 1, false))
+               ).rebalance();
+       }
+
+       private static class KeySelectorX implements KeySelector<Tuple3<String, 
Integer, Boolean>, String> {
+               private static final long serialVersionUID = 1L;
+               @Override
+               public String getKey(Tuple3<String, Integer, Boolean> in) {
+                       return in.f0;
+               }
+       }
+
+       private class KeyedCombReducer implements 
CombineFunction<Tuple3<String, Integer, Boolean>, Tuple3<String, Integer, 
Boolean>>,
+               GroupReduceFunction<Tuple3<String, Integer, Boolean>, 
Tuple3<String, Integer, Boolean>> {
+
+               @Override
+               public Tuple3<String, Integer, Boolean> 
combine(Iterable<Tuple3<String, Integer, Boolean>> values) throws Exception {
+                       String key = null;
+                       int sum = 0;
+                       boolean flag = true;
+
+                       for (Tuple3<String, Integer, Boolean> tuple : values) {
+                               key = (key == null) ? tuple.f0 : key;
+                               sum += tuple.f1;
+                               flag &= !tuple.f2;
+                       }
+                       return new Tuple3<>(key, sum, flag);
+               }
+
+               @Override
+               public void reduce(Iterable<Tuple3<String, Integer, Boolean>> 
values, Collector<Tuple3<String, Integer, Boolean>> out) throws Exception {
+                       String key = null;
+                       int sum = 0;
+                       boolean flag = true;
+
+                       for (Tuple3<String, Integer, Boolean> tuple : values) {
+                               key = (key == null) ? tuple.f0 : key;
+                               sum += tuple.f1;
+                               flag &= tuple.f2;
+                       }
+                       out.collect(new Tuple3<>(key, sum, flag));
+               }
+       }
+
+       private class KeyedGroupCombReducer implements 
GroupCombineFunction<Tuple3<String, Integer, Boolean>, Tuple3<String, Integer, 
Boolean>>,
+               GroupReduceFunction<Tuple3<String, Integer, Boolean>, 
Tuple3<String, Integer, Boolean>> {
+
+               @Override
+               public void combine(Iterable<Tuple3<String, Integer, Boolean>> 
values, Collector<Tuple3<String, Integer, Boolean>> out) throws Exception {
+                       String key = null;
+                       int sum = 0;
+                       boolean flag = true;
+
+                       for (Tuple3<String, Integer, Boolean> tuple : values) {
+                               key = (key == null) ? tuple.f0 : key;
+                               sum += tuple.f1;
+                               flag &= !tuple.f2;
+                       }
+                       out.collect(new Tuple3<>(key, sum, flag));
+               }
+
+               @Override
+               public void reduce(Iterable<Tuple3<String, Integer, Boolean>> 
values, Collector<Tuple3<String, Integer, Boolean>> out) throws Exception {
+                       String key = null;
+                       int sum = 0;
+                       boolean flag = true;
+
+                       for (Tuple3<String, Integer, Boolean> tuple : values) {
+                               key = (key == null) ? tuple.f0 : key;
+                               sum += tuple.f1;
+                               flag &= tuple.f2;
+                       }
+                       out.collect(new Tuple3<>(key, sum, flag));
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/operators/RemoteEnvironmentITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/operators/RemoteEnvironmentITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/operators/RemoteEnvironmentITCase.java
new file mode 100644
index 0000000..36eded6
--- /dev/null
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/operators/RemoteEnvironmentITCase.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.operators;
+
+import org.apache.flink.api.common.functions.RichMapPartitionFunction;
+import org.apache.flink.api.common.io.GenericInputFormat;
+import org.apache.flink.api.common.operators.util.TestNonRichInputFormat;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.LocalCollectionOutputFormat;
+import org.apache.flink.client.program.ProgramInvocationException;
+import org.apache.flink.configuration.AkkaOptions;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.io.GenericInputSplit;
+import org.apache.flink.runtime.minicluster.StandaloneMiniCluster;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Integration tests for {@link org.apache.flink.api.java.RemoteEnvironment}.
+ */
+@SuppressWarnings("serial")
+public class RemoteEnvironmentITCase extends TestLogger {
+
+       private static final int TM_SLOTS = 4;
+
+       private static final int USER_DOP = 2;
+
+       private static final String INVALID_STARTUP_TIMEOUT = "0.001 ms";
+
+       private static final String VALID_STARTUP_TIMEOUT = "100 s";
+
+       private static Configuration configuration;
+
+       private static StandaloneMiniCluster cluster;
+
+       @BeforeClass
+       public static void setupCluster() throws Exception {
+               configuration = new Configuration();
+
+               
configuration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, TM_SLOTS);
+
+               cluster = new StandaloneMiniCluster(configuration);
+       }
+
+       @AfterClass
+       public static void tearDownCluster() throws Exception {
+               cluster.close();
+       }
+
+       /**
+        * Ensure that that Akka configuration parameters can be set.
+        */
+       @Test(expected = FlinkException.class)
+       public void testInvalidAkkaConfiguration() throws Throwable {
+               Configuration config = new Configuration();
+               config.setString(AkkaOptions.STARTUP_TIMEOUT, 
INVALID_STARTUP_TIMEOUT);
+
+               final ExecutionEnvironment env = 
ExecutionEnvironment.createRemoteEnvironment(
+                               cluster.getHostname(),
+                               cluster.getPort(),
+                               config
+               );
+               env.getConfig().disableSysoutLogging();
+
+               DataSet<String> result = env.createInput(new 
TestNonRichInputFormat());
+               result.output(new LocalCollectionOutputFormat<>(new 
ArrayList<String>()));
+               try {
+                       env.execute();
+                       Assert.fail("Program should not run successfully, cause 
of invalid akka settings.");
+               } catch (ProgramInvocationException ex) {
+                       throw ex.getCause();
+               }
+       }
+
+       /**
+        * Ensure that the program parallelism can be set even if the 
configuration is supplied.
+        */
+       @Test
+       public void testUserSpecificParallelism() throws Exception {
+               Configuration config = new Configuration();
+               config.setString(AkkaOptions.STARTUP_TIMEOUT, 
VALID_STARTUP_TIMEOUT);
+
+               final ExecutionEnvironment env = 
ExecutionEnvironment.createRemoteEnvironment(
+                               cluster.getHostname(),
+                               cluster.getPort(),
+                               config
+               );
+               env.setParallelism(USER_DOP);
+               env.getConfig().disableSysoutLogging();
+
+               DataSet<Integer> result = env.createInput(new 
ParallelismDependentInputFormat())
+                               .rebalance()
+                               .mapPartition(new 
RichMapPartitionFunction<Integer, Integer>() {
+                                       @Override
+                                       public void 
mapPartition(Iterable<Integer> values, Collector<Integer> out) throws Exception 
{
+                                               
out.collect(getRuntimeContext().getIndexOfThisSubtask());
+                                       }
+                               });
+               List<Integer> resultCollection = result.collect();
+               assertEquals(USER_DOP, resultCollection.size());
+       }
+
+       private static class ParallelismDependentInputFormat extends 
GenericInputFormat<Integer> {
+
+               private transient boolean emitted;
+
+               @Override
+               public GenericInputSplit[] createInputSplits(int numSplits) 
throws IOException {
+                       assertEquals(USER_DOP, numSplits);
+                       return super.createInputSplits(numSplits);
+               }
+
+               @Override
+               public boolean reachedEnd() {
+                       return emitted;
+               }
+
+               @Override
+               public Integer nextRecord(Integer reuse) {
+                       if (emitted) {
+                               return null;
+                       }
+                       emitted = true;
+                       return 1;
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/operators/ReplicatingDataSourceITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/operators/ReplicatingDataSourceITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/operators/ReplicatingDataSourceITCase.java
new file mode 100644
index 0000000..c023cf4
--- /dev/null
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/operators/ReplicatingDataSourceITCase.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.operators;
+
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.io.ReplicatingInputFormat;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.ParallelIteratorInputFormat;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.io.GenericInputSplit;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.apache.flink.util.NumberSequenceIterator;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.List;
+
+/**
+ * Tests for replicating DataSources.
+ */
+@RunWith(Parameterized.class)
+public class ReplicatingDataSourceITCase extends MultipleProgramsTestBase {
+
+       public ReplicatingDataSourceITCase(TestExecutionMode mode){
+               super(mode);
+       }
+
+       @Test
+       public void testReplicatedSourceToJoin() throws Exception {
+               /*
+                * Test replicated source going into join
+                */
+
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               DataSet<Tuple1<Long>> source1 = env.createInput(new 
ReplicatingInputFormat<Long, GenericInputSplit>
+                               (new ParallelIteratorInputFormat<Long>(new 
NumberSequenceIterator(0L, 1000L))), BasicTypeInfo.LONG_TYPE_INFO)
+                               .map(new ToTuple());
+               DataSet<Tuple1<Long>> source2 = env.generateSequence(0L, 
1000L).map(new ToTuple());
+
+               DataSet<Tuple> pairs = source1.join(source2).where(0).equalTo(0)
+                               .projectFirst(0)
+                               .sum(0);
+
+               List<Tuple> result = pairs.collect();
+
+               String expectedResult = "(500500)";
+
+               compareResultAsText(result, expectedResult);
+       }
+
+       @Test
+       public void testReplicatedSourceToCross() throws Exception {
+               /*
+                * Test replicated source going into cross
+                */
+
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               DataSet<Tuple1<Long>> source1 = env.createInput(new 
ReplicatingInputFormat<Long, GenericInputSplit>
+                               (new ParallelIteratorInputFormat<Long>(new 
NumberSequenceIterator(0L, 1000L))), BasicTypeInfo.LONG_TYPE_INFO)
+                               .map(new ToTuple());
+               DataSet<Tuple1<Long>> source2 = env.generateSequence(0L, 
1000L).map(new ToTuple());
+
+               DataSet<Tuple1<Long>> pairs = source1.cross(source2)
+                               .filter(new FilterFunction<Tuple2<Tuple1<Long>, 
Tuple1<Long>>>() {
+                                       @Override
+                                       public boolean 
filter(Tuple2<Tuple1<Long>, Tuple1<Long>> value) throws Exception {
+                                               return 
value.f0.f0.equals(value.f1.f0);
+                                       }
+                               })
+                               .map(new MapFunction<Tuple2<Tuple1<Long>, 
Tuple1<Long>>, Tuple1<Long>>() {
+                                       @Override
+                                       public Tuple1<Long> 
map(Tuple2<Tuple1<Long>, Tuple1<Long>> value) throws Exception {
+                                               return value.f0;
+                                       }
+                               })
+                               .sum(0);
+
+               List<Tuple1<Long>> result = pairs.collect();
+
+               String expectedResult = "(500500)";
+
+               compareResultAsText(result, expectedResult);
+       }
+
+       private static class ToTuple implements MapFunction<Long, Tuple1<Long>> 
{
+
+               @Override
+               public Tuple1<Long> map(Long value) throws Exception {
+                       return new Tuple1<Long>(value);
+               }
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/operators/SampleITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/operators/SampleITCase.java 
b/flink-tests/src/test/java/org/apache/flink/test/operators/SampleITCase.java
new file mode 100644
index 0000000..c0cc62a
--- /dev/null
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/operators/SampleITCase.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.operators;
+
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.operators.FlatMapOperator;
+import org.apache.flink.api.java.operators.MapPartitionOperator;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.utils.DataSetUtils;
+import org.apache.flink.test.operators.util.CollectionDataSets;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.apache.flink.util.Collector;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.List;
+import java.util.Random;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Integration tests for {@link DataSetUtils#sample}.
+ */
+@SuppressWarnings("serial")
+@RunWith(Parameterized.class)
+public class SampleITCase extends MultipleProgramsTestBase {
+
+       private static final Random RNG = new Random();
+
+       public SampleITCase(TestExecutionMode mode) {
+               super(mode);
+       }
+
+       @Before
+       public void initiate() {
+               
ExecutionEnvironment.getExecutionEnvironment().setParallelism(5);
+       }
+
+       @Test
+       public void testSamplerWithFractionWithoutReplacement() throws 
Exception {
+               verifySamplerWithFractionWithoutReplacement(0d);
+               verifySamplerWithFractionWithoutReplacement(0.2d);
+               verifySamplerWithFractionWithoutReplacement(1.0d);
+       }
+
+       @Test
+       public void testSamplerWithFractionWithReplacement() throws Exception {
+               verifySamplerWithFractionWithReplacement(0d);
+               verifySamplerWithFractionWithReplacement(0.2d);
+               verifySamplerWithFractionWithReplacement(1.0d);
+               verifySamplerWithFractionWithReplacement(2.0d);
+       }
+
+       @Test
+       public void testSamplerWithSizeWithoutReplacement() throws Exception {
+               verifySamplerWithFixedSizeWithoutReplacement(0);
+               verifySamplerWithFixedSizeWithoutReplacement(2);
+               verifySamplerWithFixedSizeWithoutReplacement(21);
+       }
+
+       @Test
+       public void testSamplerWithSizeWithReplacement() throws Exception {
+               verifySamplerWithFixedSizeWithReplacement(0);
+               verifySamplerWithFixedSizeWithReplacement(2);
+               verifySamplerWithFixedSizeWithReplacement(21);
+       }
+
+       private void verifySamplerWithFractionWithoutReplacement(double 
fraction) throws Exception {
+               verifySamplerWithFractionWithoutReplacement(fraction, 
RNG.nextLong());
+       }
+
+       private void verifySamplerWithFractionWithoutReplacement(double 
fraction, long seed) throws Exception {
+               verifySamplerWithFraction(false, fraction, seed);
+       }
+
+       private void verifySamplerWithFractionWithReplacement(double fraction) 
throws Exception {
+               verifySamplerWithFractionWithReplacement(fraction, 
RNG.nextLong());
+       }
+
+       private void verifySamplerWithFractionWithReplacement(double fraction, 
long seed) throws Exception {
+               verifySamplerWithFraction(true, fraction, seed);
+       }
+
+       private void verifySamplerWithFraction(boolean withReplacement, double 
fraction, long seed) throws Exception {
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               FlatMapOperator<Tuple3<Integer, Long, String>, String> ds = 
getSourceDataSet(env);
+               MapPartitionOperator<String, String> sampled = 
DataSetUtils.sample(ds, withReplacement, fraction, seed);
+               List<String> result = sampled.collect();
+               containsResultAsText(result, getSourceStrings());
+       }
+
+       private void verifySamplerWithFixedSizeWithoutReplacement(int 
numSamples) throws Exception {
+               verifySamplerWithFixedSizeWithoutReplacement(numSamples, 
RNG.nextLong());
+       }
+
+       private void verifySamplerWithFixedSizeWithoutReplacement(int 
numSamples, long seed) throws Exception {
+               verifySamplerWithFixedSize(false, numSamples, seed);
+       }
+
+       private void verifySamplerWithFixedSizeWithReplacement(int numSamples) 
throws Exception {
+               verifySamplerWithFixedSizeWithReplacement(numSamples, 
RNG.nextLong());
+       }
+
+       private void verifySamplerWithFixedSizeWithReplacement(int numSamples, 
long seed) throws Exception {
+               verifySamplerWithFixedSize(true, numSamples, seed);
+       }
+
+       private void verifySamplerWithFixedSize(boolean withReplacement, int 
numSamples, long seed) throws Exception {
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               FlatMapOperator<Tuple3<Integer, Long, String>, String> ds = 
getSourceDataSet(env);
+               DataSet<String> sampled = DataSetUtils.sampleWithSize(ds, 
withReplacement, numSamples, seed);
+               List<String> result = sampled.collect();
+               assertEquals(numSamples, result.size());
+               containsResultAsText(result, getSourceStrings());
+       }
+
+       private FlatMapOperator<Tuple3<Integer, Long, String>, String> 
getSourceDataSet(ExecutionEnvironment env) {
+               return CollectionDataSets.get3TupleDataSet(env).flatMap(
+                       new FlatMapFunction<Tuple3<Integer, Long, String>, 
String>() {
+                               @Override
+                               public void flatMap(Tuple3<Integer, Long, 
String> value, Collector<String> out) throws Exception {
+                                       out.collect(value.f2);
+                               }
+                       });
+       }
+
+       private String getSourceStrings() {
+               return "Hi\n" +
+                       "Hello\n" +
+                       "Hello world\n" +
+                       "Hello world, how are you?\n" +
+                       "I am fine.\n" +
+                       "Luke Skywalker\n" +
+                       "Comment#1\n" +
+                       "Comment#2\n" +
+                       "Comment#3\n" +
+                       "Comment#4\n" +
+                       "Comment#5\n" +
+                       "Comment#6\n" +
+                       "Comment#7\n" +
+                       "Comment#8\n" +
+                       "Comment#9\n" +
+                       "Comment#10\n" +
+                       "Comment#11\n" +
+                       "Comment#12\n" +
+                       "Comment#13\n" +
+                       "Comment#14\n" +
+                       "Comment#15\n";
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/operators/SortPartitionITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/operators/SortPartitionITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/operators/SortPartitionITCase.java
new file mode 100644
index 0000000..a44f28c
--- /dev/null
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/operators/SortPartitionITCase.java
@@ -0,0 +1,347 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.operators;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.MapPartitionFunction;
+import org.apache.flink.api.common.operators.Order;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.tuple.Tuple5;
+import org.apache.flink.test.operators.util.CollectionDataSets;
+import org.apache.flink.test.operators.util.CollectionDataSets.POJO;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.apache.flink.util.Collector;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.io.Serializable;
+import java.util.Iterator;
+import java.util.List;
+
+/**
+ * Tests for {@link DataSet#sortPartition}.
+ */
+@RunWith(Parameterized.class)
+public class SortPartitionITCase extends MultipleProgramsTestBase {
+
+       public SortPartitionITCase(TestExecutionMode mode){
+               super(mode);
+       }
+
+       @Test
+       public void testSortPartitionByKeyField() throws Exception {
+               /*
+                * Test sort partition on key field
+                */
+
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               env.setParallelism(4);
+
+               DataSet<Tuple3<Integer, Long, String>> ds = 
CollectionDataSets.get3TupleDataSet(env);
+               List<Tuple1<Boolean>> result = ds
+                               .map(new IdMapper<Tuple3<Integer, Long, 
String>>()).setParallelism(4) // parallelize input
+                               .sortPartition(1, Order.DESCENDING)
+                               .mapPartition(new OrderCheckMapper<>(new 
Tuple3Checker()))
+                               .distinct().collect();
+
+               String expected = "(true)\n";
+
+               compareResultAsText(result, expected);
+       }
+
+       @Test
+       public void testSortPartitionByTwoKeyFields() throws Exception {
+               /*
+                * Test sort partition on two key fields
+                */
+
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               env.setParallelism(2);
+
+               DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds = 
CollectionDataSets.get5TupleDataSet(env);
+               List<Tuple1<Boolean>> result = ds
+                               .map(new IdMapper<Tuple5<Integer, Long, 
Integer, String, Long>>()).setParallelism(2) // parallelize input
+                               .sortPartition(4, Order.ASCENDING)
+                               .sortPartition(2, Order.DESCENDING)
+                               .mapPartition(new OrderCheckMapper<>(new 
Tuple5Checker()))
+                               .distinct().collect();
+
+               String expected = "(true)\n";
+
+               compareResultAsText(result, expected);
+       }
+
+       @SuppressWarnings({ "rawtypes", "unchecked" })
+       @Test
+       public void testSortPartitionByFieldExpression() throws Exception {
+               /*
+                * Test sort partition on field expression
+                */
+
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               env.setParallelism(4);
+
+               DataSet<Tuple3<Integer, Long, String>> ds = 
CollectionDataSets.get3TupleDataSet(env);
+               List<Tuple1<Boolean>> result = ds
+                               .map(new IdMapper()).setParallelism(4) // 
parallelize input
+                               .sortPartition("f1", Order.DESCENDING)
+                               .mapPartition(new OrderCheckMapper<>(new 
Tuple3Checker()))
+                               .distinct().collect();
+
+               String expected = "(true)\n";
+
+               compareResultAsText(result, expected);
+       }
+
+       @Test
+       public void testSortPartitionByTwoFieldExpressions() throws Exception {
+               /*
+                * Test sort partition on two field expressions
+                */
+
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               env.setParallelism(2);
+
+               DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds = 
CollectionDataSets.get5TupleDataSet(env);
+               List<Tuple1<Boolean>> result = ds
+                               .map(new IdMapper<Tuple5<Integer, Long, 
Integer, String, Long>>()).setParallelism(2) // parallelize input
+                               .sortPartition("f4", Order.ASCENDING)
+                               .sortPartition("f2", Order.DESCENDING)
+                               .mapPartition(new OrderCheckMapper<>(new 
Tuple5Checker()))
+                               .distinct().collect();
+
+               String expected = "(true)\n";
+
+               compareResultAsText(result, expected);
+       }
+
+       @Test
+       public void testSortPartitionByNestedFieldExpression() throws Exception 
{
+               /*
+                * Test sort partition on nested field expressions
+                */
+
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               env.setParallelism(3);
+
+               DataSet<Tuple2<Tuple2<Integer, Integer>, String>> ds = 
CollectionDataSets.getGroupSortedNestedTupleDataSet(env);
+               List<Tuple1<Boolean>> result = ds
+                               .map(new IdMapper<Tuple2<Tuple2<Integer, 
Integer>, String>>()).setParallelism(3) // parallelize input
+                               .sortPartition("f0.f1", Order.ASCENDING)
+                               .sortPartition("f1", Order.DESCENDING)
+                               .mapPartition(new OrderCheckMapper<>(new 
NestedTupleChecker()))
+                               .distinct().collect();
+
+               String expected = "(true)\n";
+
+               compareResultAsText(result, expected);
+       }
+
+       @Test
+       public void testSortPartitionPojoByNestedFieldExpression() throws 
Exception {
+               /*
+                * Test sort partition on field expression
+                */
+
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               env.setParallelism(3);
+
+               DataSet<POJO> ds = CollectionDataSets.getMixedPojoDataSet(env);
+               List<Tuple1<Boolean>> result = ds
+                               .map(new IdMapper<POJO>()).setParallelism(1) // 
parallelize input
+                               
.sortPartition("nestedTupleWithCustom.f1.myString", Order.ASCENDING)
+                               .sortPartition("number", Order.DESCENDING)
+                               .mapPartition(new OrderCheckMapper<>(new 
PojoChecker()))
+                               .distinct().collect();
+
+               String expected = "(true)\n";
+
+               compareResultAsText(result, expected);
+       }
+
+       @Test
+       public void testSortPartitionParallelismChange() throws Exception {
+               /*
+                * Test sort partition with parallelism change
+                */
+
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               env.setParallelism(3);
+
+               DataSet<Tuple3<Integer, Long, String>> ds = 
CollectionDataSets.get3TupleDataSet(env);
+               List<Tuple1<Boolean>> result = ds
+                               .sortPartition(1, 
Order.DESCENDING).setParallelism(3) // change parallelism
+                               .mapPartition(new OrderCheckMapper<>(new 
Tuple3Checker()))
+                               .distinct().collect();
+
+               String expected = "(true)\n";
+
+               compareResultAsText(result, expected);
+       }
+
+       @Test
+       public void testSortPartitionWithKeySelector1() throws Exception {
+               /*
+                * Test sort partition on an extracted key
+                */
+
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               env.setParallelism(4);
+
+               DataSet<Tuple3<Integer, Long, String>> ds = 
CollectionDataSets.get3TupleDataSet(env);
+               List<Tuple1<Boolean>> result = ds
+                       .map(new IdMapper<Tuple3<Integer, Long, 
String>>()).setParallelism(4) // parallelize input
+                       .sortPartition(new KeySelector<Tuple3<Integer, Long, 
String>, Long>() {
+                               @Override
+                               public Long getKey(Tuple3<Integer, Long, 
String> value) throws Exception {
+                                       return value.f1;
+                               }
+                       }, Order.ASCENDING)
+                       .mapPartition(new OrderCheckMapper<>(new 
Tuple3AscendingChecker()))
+                       .distinct().collect();
+
+               String expected = "(true)\n";
+
+               compareResultAsText(result, expected);
+       }
+
+       @Test
+       public void testSortPartitionWithKeySelector2() throws Exception {
+               /*
+                * Test sort partition on an extracted key
+                */
+
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               env.setParallelism(4);
+
+               DataSet<Tuple3<Integer, Long, String>> ds = 
CollectionDataSets.get3TupleDataSet(env);
+               List<Tuple1<Boolean>> result = ds
+                       .map(new IdMapper<Tuple3<Integer, Long, 
String>>()).setParallelism(4) // parallelize input
+                       .sortPartition(new KeySelector<Tuple3<Integer, Long, 
String>, Tuple2<Integer, Long>>() {
+                               @Override
+                               public Tuple2<Integer, Long> 
getKey(Tuple3<Integer, Long, String> value) throws Exception {
+                                       return new Tuple2<>(value.f0, value.f1);
+                               }
+                       }, Order.DESCENDING)
+                       .mapPartition(new OrderCheckMapper<>(new 
Tuple3Checker()))
+                       .distinct().collect();
+
+               String expected = "(true)\n";
+
+               compareResultAsText(result, expected);
+       }
+
+       private interface OrderChecker<T> extends Serializable {
+               boolean inOrder(T t1, T t2);
+       }
+
+       @SuppressWarnings("serial")
+       private static class Tuple3Checker implements 
OrderChecker<Tuple3<Integer, Long, String>> {
+               @Override
+               public boolean inOrder(Tuple3<Integer, Long, String> t1, 
Tuple3<Integer, Long, String> t2) {
+                       return t1.f1 >= t2.f1;
+               }
+       }
+
+       @SuppressWarnings("serial")
+       private static class Tuple3AscendingChecker implements 
OrderChecker<Tuple3<Integer, Long, String>> {
+               @Override
+               public boolean inOrder(Tuple3<Integer, Long, String> t1, 
Tuple3<Integer, Long, String> t2) {
+                       return t1.f1 <= t2.f1;
+               }
+       }
+
+       @SuppressWarnings("serial")
+       private static class Tuple5Checker implements 
OrderChecker<Tuple5<Integer, Long, Integer, String, Long>> {
+               @Override
+               public boolean inOrder(Tuple5<Integer, Long, Integer, String, 
Long> t1,
+                               Tuple5<Integer, Long, Integer, String, Long> 
t2) {
+                       return t1.f4 < t2.f4 || t1.f4.equals(t2.f4) && t1.f2 >= 
t2.f2;
+               }
+       }
+
+       @SuppressWarnings("serial")
+       private static class NestedTupleChecker implements 
OrderChecker<Tuple2<Tuple2<Integer, Integer>, String>> {
+               @Override
+               public boolean inOrder(Tuple2<Tuple2<Integer, Integer>, String> 
t1,
+                               Tuple2<Tuple2<Integer, Integer>, String> t2) {
+                       return t1.f0.f1 < t2.f0.f1 ||
+                                       t1.f0.f1.equals(t2.f0.f1) && 
t1.f1.compareTo(t2.f1) >= 0;
+               }
+       }
+
+       @SuppressWarnings("serial")
+       private static class PojoChecker implements OrderChecker<POJO> {
+               @Override
+               public boolean inOrder(POJO t1, POJO t2) {
+                       return 
t1.nestedTupleWithCustom.f1.myString.compareTo(t2.nestedTupleWithCustom.f1.myString)
 < 0 ||
+                                       
t1.nestedTupleWithCustom.f1.myString.compareTo(t2.nestedTupleWithCustom.f1.myString)
 == 0 &&
+                                       t1.number >= t2.number;
+               }
+       }
+
+       @SuppressWarnings("unused, serial")
+       private static class OrderCheckMapper<T> implements 
MapPartitionFunction<T, Tuple1<Boolean>> {
+
+               OrderChecker<T> checker;
+
+               public OrderCheckMapper() {}
+
+               public OrderCheckMapper(OrderChecker<T> checker) {
+                       this.checker = checker;
+               }
+
+               @Override
+               public void mapPartition(Iterable<T> values, 
Collector<Tuple1<Boolean>> out) throws Exception {
+
+                       Iterator<T> it = values.iterator();
+                       if (!it.hasNext()) {
+                               out.collect(new Tuple1<>(true));
+                       } else {
+                               T last = it.next();
+
+                               while (it.hasNext()) {
+                                       T next = it.next();
+                                       if (!checker.inOrder(last, next)) {
+                                               out.collect(new 
Tuple1<>(false));
+                                               return;
+                                       }
+                                       last = next;
+                               }
+                               out.collect(new Tuple1<>(true));
+                       }
+               }
+       }
+
+       @SuppressWarnings("serial")
+       private static class IdMapper<T> implements MapFunction<T, T> {
+
+               @Override
+               public T map(T value) throws Exception {
+                       return value;
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/operators/SumMinMaxITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/operators/SumMinMaxITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/operators/SumMinMaxITCase.java
new file mode 100644
index 0000000..ebec17b
--- /dev/null
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/operators/SumMinMaxITCase.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.operators;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.test.operators.util.CollectionDataSets;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.List;
+
+/**
+ * Integration tests for {@link org.apache.flink.api.scala.GroupedDataSet#min} 
and
+ * {@link org.apache.flink.api.scala.GroupedDataSet#max}.
+ */
+@RunWith(Parameterized.class)
+public class SumMinMaxITCase extends MultipleProgramsTestBase {
+
+       public SumMinMaxITCase(TestExecutionMode mode){
+               super(mode);
+       }
+
+       @Test
+       public void testSumMaxAndProject() throws Exception {
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               DataSet<Tuple3<Integer, Long, String>> ds = 
CollectionDataSets.get3TupleDataSet(env);
+               DataSet<Tuple2<Integer, Long>> sumDs = ds
+                               .sum(0)
+                               .andMax(1)
+                               .project(0, 1);
+
+               List<Tuple2<Integer, Long>> result = sumDs.collect();
+
+               String expected = "231,6\n";
+
+               compareResultAsTuples(result, expected);
+       }
+
+       @Test
+       public void testGroupedAggregate() throws Exception {
+               /*
+                * Grouped Aggregate
+                */
+
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               DataSet<Tuple3<Integer, Long, String>> ds = 
CollectionDataSets.get3TupleDataSet(env);
+               DataSet<Tuple2<Long, Integer>> aggregateDs = ds.groupBy(1)
+                               .sum(0)
+                               .project(1, 0);
+
+               List<Tuple2<Long, Integer>> result = aggregateDs.collect();
+
+               String expected = "1,1\n" +
+                               "2,5\n" +
+                               "3,15\n" +
+                               "4,34\n" +
+                               "5,65\n" +
+                               "6,111\n";
+
+               compareResultAsTuples(result, expected);
+       }
+
+       @Test
+       public void testNestedAggregate() throws Exception {
+               /*
+                * Nested Aggregate
+                */
+
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               DataSet<Tuple3<Integer, Long, String>> ds = 
CollectionDataSets.get3TupleDataSet(env);
+               DataSet<Tuple1<Integer>> aggregateDs = ds.groupBy(1)
+                               .min(0)
+                               .min(0)
+                               .project(0);
+
+               List<Tuple1<Integer>> result = aggregateDs.collect();
+
+               String expected = "1\n";
+
+               compareResultAsTuples(result, expected);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/operators/TypeHintITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/operators/TypeHintITCase.java 
b/flink-tests/src/test/java/org/apache/flink/test/operators/TypeHintITCase.java
new file mode 100644
index 0000000..75bf8f0
--- /dev/null
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/operators/TypeHintITCase.java
@@ -0,0 +1,330 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.operators;
+
+import org.apache.flink.api.common.functions.CoGroupFunction;
+import org.apache.flink.api.common.functions.FlatJoinFunction;
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.functions.GroupCombineFunction;
+import org.apache.flink.api.common.functions.GroupReduceFunction;
+import org.apache.flink.api.common.functions.JoinFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.operators.Order;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.test.operators.util.CollectionDataSets;
+import org.apache.flink.test.util.JavaProgramTestBase;
+import org.apache.flink.util.Collector;
+
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.LinkedList;
+import java.util.List;
+
+/**
+ * Integration tests for {@link org.apache.flink.api.common.typeinfo.TypeHint}.
+ */
+@RunWith(Parameterized.class)
+public class TypeHintITCase extends JavaProgramTestBase {
+
+       private static final int NUM_PROGRAMS = 9;
+
+       private int curProgId = config.getInteger("ProgramId", -1);
+
+       public TypeHintITCase(Configuration config) {
+               super(config);
+       }
+
+       @Override
+       protected void testProgram() throws Exception {
+               TypeHintProgs.runProgram(curProgId);
+       }
+
+       @Parameters
+       public static Collection<Object[]> getConfigurations() throws 
FileNotFoundException, IOException {
+
+               LinkedList<Configuration> tConfigs = new 
LinkedList<Configuration>();
+
+               for (int i = 1; i <= NUM_PROGRAMS; i++) {
+                       Configuration config = new Configuration();
+                       config.setInteger("ProgramId", i);
+                       tConfigs.add(config);
+               }
+
+               return toParameterList(tConfigs);
+       }
+
+       private static class TypeHintProgs {
+
+               public static void runProgram(int progId) throws Exception {
+                       switch(progId) {
+                       // Test identity map with missing types and string type 
hint
+                       case 1: {
+                               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+                               DataSet<Tuple3<Integer, Long, String>> ds = 
CollectionDataSets.getSmall3TupleDataSet(env);
+                               DataSet<Tuple3<Integer, Long, String>> 
identityMapDs = ds
+                                               .map(new Mapper<Tuple3<Integer, 
Long, String>, Tuple3<Integer, Long, String>>())
+                                               .returns("Tuple3<Integer, Long, 
String>");
+                               List<Tuple3<Integer, Long, String>> result = 
identityMapDs.collect();
+
+                               String expectedResult = "(2,2,Hello)\n" +
+                                               "(3,2,Hello world)\n" +
+                                               "(1,1,Hi)\n";
+
+                               compareResultAsText(result, expectedResult);
+                               break;
+                       }
+                       // Test identity map with missing types and type 
information type hint
+                       case 2: {
+                               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+                               DataSet<Tuple3<Integer, Long, String>> ds = 
CollectionDataSets.getSmall3TupleDataSet(env);
+                               DataSet<Tuple3<Integer, Long, String>> 
identityMapDs = ds
+                                               // all following generics get 
erased during compilation
+                                               .map(new Mapper<Tuple3<Integer, 
Long, String>, Tuple3<Integer, Long, String>>())
+                                               .returns(new 
TupleTypeInfo<Tuple3<Integer, Long, String>>(BasicTypeInfo.INT_TYPE_INFO, 
BasicTypeInfo.LONG_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO));
+                               List<Tuple3<Integer, Long, String>> result = 
identityMapDs
+                                               .collect();
+
+                               String expectedResult = "(2,2,Hello)\n" +
+                                               "(3,2,Hello world)\n" +
+                                               "(1,1,Hi)\n";
+
+                               compareResultAsText(result, expectedResult);
+                               break;
+                       }
+                       // Test flat map with class type hint
+                       case 3: {
+                               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+                               DataSet<Tuple3<Integer, Long, String>> ds = 
CollectionDataSets.getSmall3TupleDataSet(env);
+                               DataSet<Integer> identityMapDs = ds
+                                               .flatMap(new 
FlatMapper<Tuple3<Integer, Long, String>, Integer>())
+                                               .returns(Integer.class);
+                               List<Integer> result = identityMapDs.collect();
+
+                               String expectedResult = "2\n" +
+                                               "3\n" +
+                                               "1\n";
+
+                               compareResultAsText(result, expectedResult);
+                               break;
+                       }
+                       // Test join with type information type hint
+                       case 4: {
+                               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+                               DataSet<Tuple3<Integer, Long, String>> ds1 = 
CollectionDataSets.getSmall3TupleDataSet(env);
+                               DataSet<Tuple3<Integer, Long, String>> ds2 = 
CollectionDataSets.getSmall3TupleDataSet(env);
+                               DataSet<Integer> resultDs = ds1
+                                               .join(ds2)
+                                               .where(0)
+                                               .equalTo(0)
+                                               .with(new 
Joiner<Tuple3<Integer, Long, String>, Tuple3<Integer, Long, String>, Integer>())
+                                               
.returns(BasicTypeInfo.INT_TYPE_INFO);
+                               List<Integer> result = resultDs.collect();
+
+                               String expectedResult = "2\n" +
+                                               "3\n" +
+                                               "1\n";
+
+                               compareResultAsText(result, expectedResult);
+                               break;
+                       }
+                       // Test flat join with type information type hint
+                       case 5: {
+                               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+                               DataSet<Tuple3<Integer, Long, String>> ds1 = 
CollectionDataSets.getSmall3TupleDataSet(env);
+                               DataSet<Tuple3<Integer, Long, String>> ds2 = 
CollectionDataSets.getSmall3TupleDataSet(env);
+                               DataSet<Integer> resultDs = ds1
+                                               .join(ds2)
+                                               .where(0)
+                                               .equalTo(0)
+                                               .with(new 
FlatJoiner<Tuple3<Integer, Long, String>, Tuple3<Integer, Long, String>, 
Integer>())
+                                               
.returns(BasicTypeInfo.INT_TYPE_INFO);
+                               List<Integer> result = resultDs.collect();
+
+                               String expectedResult = "2\n" +
+                                               "3\n" +
+                                               "1\n";
+
+                               compareResultAsText(result, expectedResult);
+                               break;
+                       }
+                       // Test unsorted group reduce with type information 
type hint
+                       case 6: {
+                               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+                               DataSet<Tuple3<Integer, Long, String>> ds = 
CollectionDataSets.getSmall3TupleDataSet(env);
+                               DataSet<Integer> resultDs = ds
+                                               .groupBy(0)
+                                               .reduceGroup(new 
GroupReducer<Tuple3<Integer, Long, String>, Integer>())
+                                               
.returns(BasicTypeInfo.INT_TYPE_INFO);
+                               List<Integer> result = resultDs.collect();
+
+                               String expectedResult = "2\n" +
+                                               "3\n" +
+                                               "1\n";
+
+                               compareResultAsText(result, expectedResult);
+                               break;
+                       }
+                       // Test sorted group reduce with type information type 
hint
+                       case 7: {
+                               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+                               DataSet<Tuple3<Integer, Long, String>> ds = 
CollectionDataSets.getSmall3TupleDataSet(env);
+                               DataSet<Integer> resultDs = ds
+                                               .groupBy(0)
+                                               .sortGroup(0, Order.ASCENDING)
+                                               .reduceGroup(new 
GroupReducer<Tuple3<Integer, Long, String>, Integer>())
+                                               
.returns(BasicTypeInfo.INT_TYPE_INFO);
+                               List<Integer> result = resultDs.collect();
+
+                               String expectedResult = "2\n" +
+                                               "3\n" +
+                                               "1\n";
+
+                               compareResultAsText(result, expectedResult);
+                               break;
+                       }
+                       // Test combine group with type information type hint
+                       case 8: {
+                               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+                               DataSet<Tuple3<Integer, Long, String>> ds = 
CollectionDataSets.getSmall3TupleDataSet(env);
+                               DataSet<Integer> resultDs = ds
+                                               .groupBy(0)
+                                               .combineGroup(new 
GroupCombiner<Tuple3<Integer, Long, String>, Integer>())
+                                               
.returns(BasicTypeInfo.INT_TYPE_INFO);
+                               List<Integer> result = resultDs.collect();
+
+                               String expectedResult = "2\n" +
+                                               "3\n" +
+                                               "1\n";
+
+                               compareResultAsText(result, expectedResult);
+                               break;
+                       }
+                       // Test cogroup with type information type hint
+                       case 9: {
+                               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+                               DataSet<Tuple3<Integer, Long, String>> ds1 = 
CollectionDataSets.getSmall3TupleDataSet(env);
+                               DataSet<Tuple3<Integer, Long, String>> ds2 = 
CollectionDataSets.getSmall3TupleDataSet(env);
+                               DataSet<Integer> resultDs = ds1
+                                               .coGroup(ds2)
+                                               .where(0)
+                                               .equalTo(0)
+                                               .with(new 
CoGrouper<Tuple3<Integer, Long, String>, Tuple3<Integer, Long, String>, 
Integer>())
+                                               
.returns(BasicTypeInfo.INT_TYPE_INFO);
+                               List<Integer> result = resultDs.collect();
+
+                               String expectedResult = "2\n" +
+                                               "3\n" +
+                                               "1\n";
+
+                               compareResultAsText(result, expectedResult);
+                               break;
+                       }
+                       default:
+                               throw new IllegalArgumentException("Invalid 
program id");
+                       }
+               }
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+
+       private static class Mapper<T, V> implements MapFunction<T, V> {
+               private static final long serialVersionUID = 1L;
+
+               @SuppressWarnings("unchecked")
+               @Override
+               public V map(T value) throws Exception {
+                       return (V) value;
+               }
+       }
+
+       private static class FlatMapper<T, V> implements FlatMapFunction<T, V> {
+               private static final long serialVersionUID = 1L;
+
+               @SuppressWarnings({ "unchecked", "rawtypes" })
+               @Override
+               public void flatMap(T value, Collector<V> out) throws Exception 
{
+                       out.collect((V) ((Tuple3) value).f0);
+               }
+       }
+
+       private static class Joiner<IN1, IN2, OUT> implements JoinFunction<IN1, 
IN2, OUT> {
+               private static final long serialVersionUID = 1L;
+
+               @Override
+               public OUT join(IN1 first, IN2 second) throws Exception {
+                       return (OUT) ((Tuple3) first).f0;
+               }
+       }
+
+       private static class FlatJoiner<IN1, IN2, OUT> implements 
FlatJoinFunction<IN1, IN2, OUT> {
+               private static final long serialVersionUID = 1L;
+
+               @Override
+               public void join(IN1 first, IN2 second, Collector<OUT> out) 
throws Exception {
+                       out.collect((OUT) ((Tuple3) first).f0);
+               }
+       }
+
+       private static class GroupReducer<IN, OUT> implements 
GroupReduceFunction<IN, OUT> {
+               private static final long serialVersionUID = 1L;
+
+               @Override
+               public void reduce(Iterable<IN> values, Collector<OUT> out) 
throws Exception {
+                       out.collect((OUT) ((Tuple3) 
values.iterator().next()).f0);
+               }
+       }
+
+       private static class GroupCombiner<IN, OUT> implements 
GroupCombineFunction<IN, OUT> {
+               private static final long serialVersionUID = 1L;
+
+               @Override
+               public void combine(Iterable<IN> values, Collector<OUT> out) 
throws Exception {
+                       out.collect((OUT) ((Tuple3) 
values.iterator().next()).f0);
+               }
+       }
+
+       private static class CoGrouper<IN1, IN2, OUT> implements 
CoGroupFunction<IN1, IN2, OUT> {
+               private static final long serialVersionUID = 1L;
+
+               @Override
+               public void coGroup(Iterable<IN1> first, Iterable<IN2> second, 
Collector<OUT> out) throws Exception {
+                       out.collect((OUT) ((Tuple3) 
first.iterator().next()).f0);
+               }
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/operators/UnionITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/operators/UnionITCase.java 
b/flink-tests/src/test/java/org/apache/flink/test/operators/UnionITCase.java
new file mode 100644
index 0000000..daa9cb1
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/operators/UnionITCase.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.operators;
+
+import org.apache.flink.api.common.functions.RichFilterFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.test.operators.util.CollectionDataSets;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.List;
+
+/**
+ * Integration tests for {@link DataSet#union}.
+ */
+@RunWith(Parameterized.class)
+public class UnionITCase extends MultipleProgramsTestBase {
+
+       private static final String FULL_TUPLE_3_STRING = "1,1,Hi\n" +
+                       "2,2,Hello\n" +
+                       "3,2,Hello world\n" +
+                       "4,3,Hello world, how are you?\n" +
+                       "5,3,I am fine.\n" +
+                       "6,3,Luke Skywalker\n" +
+                       "7,4,Comment#1\n" +
+                       "8,4,Comment#2\n" +
+                       "9,4,Comment#3\n" +
+                       "10,4,Comment#4\n" +
+                       "11,5,Comment#5\n" +
+                       "12,5,Comment#6\n" +
+                       "13,5,Comment#7\n" +
+                       "14,5,Comment#8\n" +
+                       "15,5,Comment#9\n" +
+                       "16,6,Comment#10\n" +
+                       "17,6,Comment#11\n" +
+                       "18,6,Comment#12\n" +
+                       "19,6,Comment#13\n" +
+                       "20,6,Comment#14\n" +
+                       "21,6,Comment#15\n";
+
+       public UnionITCase(TestExecutionMode mode){
+               super(mode);
+       }
+
+       @Test
+       public void testUnion2IdenticalDataSets() throws Exception {
+               /*
+                * Union of 2 Same Data Sets
+                */
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               DataSet<Tuple3<Integer, Long, String>> ds = 
CollectionDataSets.get3TupleDataSet(env);
+               DataSet<Tuple3<Integer, Long, String>> unionDs = 
ds.union(CollectionDataSets.get3TupleDataSet(env));
+
+               List<Tuple3<Integer, Long, String>> result = unionDs.collect();
+
+               String expected = FULL_TUPLE_3_STRING + FULL_TUPLE_3_STRING;
+
+               compareResultAsTuples(result, expected);
+       }
+
+       @Test
+       public void testUnion5IdenticalDataSets() throws Exception {
+               /*
+                * Union of 5 same Data Sets, with multiple unions
+                */
+
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               DataSet<Tuple3<Integer, Long, String>> ds = 
CollectionDataSets.get3TupleDataSet(env);
+               DataSet<Tuple3<Integer, Long, String>> unionDs = 
ds.union(CollectionDataSets.get3TupleDataSet(env))
+                               .union(CollectionDataSets.get3TupleDataSet(env))
+                               .union(CollectionDataSets.get3TupleDataSet(env))
+                               
.union(CollectionDataSets.get3TupleDataSet(env));
+
+               List<Tuple3<Integer, Long, String>> result = unionDs.collect();
+
+               String expected = FULL_TUPLE_3_STRING + FULL_TUPLE_3_STRING
+                               + FULL_TUPLE_3_STRING +
+                               FULL_TUPLE_3_STRING +   FULL_TUPLE_3_STRING;
+
+               compareResultAsTuples(result, expected);
+       }
+
+       @Test
+       public void testUnionWithEmptyDataSet() throws Exception {
+               /*
+                * Test on union with empty dataset
+                */
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               // Don't know how to make an empty result in an other way than 
filtering it
+               DataSet<Tuple3<Integer, Long, String>> empty = 
CollectionDataSets.get3TupleDataSet(env).
+                               filter(new RichFilter1());
+
+               DataSet<Tuple3<Integer, Long, String>> unionDs = 
CollectionDataSets.get3TupleDataSet(env)
+                               .union(empty);
+
+               List<Tuple3<Integer, Long, String>> result = unionDs.collect();
+
+               String expected = FULL_TUPLE_3_STRING;
+
+               compareResultAsTuples(result, expected);
+       }
+
+       private static class RichFilter1 extends 
RichFilterFunction<Tuple3<Integer, Long, String>> {
+               private static final long serialVersionUID = 1L;
+
+               @Override
+               public boolean filter(Tuple3<Integer, Long, String> value) 
throws Exception {
+                       return false;
+               }
+       }
+
+}

Reply via email to