http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/operators/CustomDistributionITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/operators/CustomDistributionITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/operators/CustomDistributionITCase.java
new file mode 100644
index 0000000..2452475
--- /dev/null
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/operators/CustomDistributionITCase.java
@@ -0,0 +1,362 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.operators;
+
+import org.apache.flink.api.common.distributions.DataDistribution;
+import org.apache.flink.api.common.functions.RichMapPartitionFunction;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.utils.DataSetUtils;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
+import org.apache.flink.test.operators.util.CollectionDataSets;
+import org.apache.flink.test.util.TestBaseUtils;
+import org.apache.flink.test.util.TestEnvironment;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.IOException;
+
+import static org.junit.Assert.fail;
+
+/**
+ * Integration tests for custom {@link DataDistribution}.
+ */
+@SuppressWarnings("serial")
+public class CustomDistributionITCase extends TestLogger {
+
+       // 
------------------------------------------------------------------------
+       //  The mini cluster that is shared across tests
+       // 
------------------------------------------------------------------------
+
+       private static LocalFlinkMiniCluster cluster;
+
+       @BeforeClass
+       public static void setup() throws Exception {
+               cluster = TestBaseUtils.startCluster(1, 8, false, false, true);
+       }
+
+       @AfterClass
+       public static void teardown() throws Exception {
+               TestBaseUtils.stopCluster(cluster, 
TestBaseUtils.DEFAULT_TIMEOUT);
+       }
+
+       @Before
+       public void prepare() {
+               TestEnvironment clusterEnv = new TestEnvironment(cluster, 1, 
false);
+               clusterEnv.setAsContext();
+       }
+
+       @After
+       public void cleanup() {
+               TestEnvironment.unsetAsContext();
+       }
+
+       // 
------------------------------------------------------------------------
+
+       /**
+        * Test the record partitioned rightly with one field according to the 
customized data distribution.
+        */
+       @Test
+       public void testPartitionWithDistribution1() throws Exception {
+               final TestDataDist1 dist = new TestDataDist1();
+
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               env.setParallelism(dist.getParallelism());
+
+               DataSet<Tuple3<Integer, Long, String>> input = 
CollectionDataSets.get3TupleDataSet(env);
+
+               DataSet<Boolean> result = DataSetUtils
+                       .partitionByRange(input, dist, 0)
+                       .mapPartition(new 
RichMapPartitionFunction<Tuple3<Integer, Long, String>, Boolean>() {
+
+                               @Override
+                               public void 
mapPartition(Iterable<Tuple3<Integer, Long, String>> values, Collector<Boolean> 
out) throws Exception {
+                                       int pIdx = 
getRuntimeContext().getIndexOfThisSubtask();
+
+                                       for (Tuple3<Integer, Long, String> s : 
values) {
+                                               boolean correctlyPartitioned = 
true;
+                                               if (pIdx == 0) {
+                                                       Integer[] upper = 
dist.boundaries[0];
+                                                       if 
(s.f0.compareTo(upper[0]) > 0) {
+                                                               
correctlyPartitioned = false;
+                                                       }
+                                               }
+                                               else if (pIdx > 0 && pIdx < 
dist.getParallelism() - 1) {
+                                                       Integer[] lower = 
dist.boundaries[pIdx - 1];
+                                                       Integer[] upper = 
dist.boundaries[pIdx];
+                                                       if 
(s.f0.compareTo(upper[0]) > 0 || (s.f0.compareTo(lower[0]) <= 0)) {
+                                                               
correctlyPartitioned = false;
+                                                       }
+                                               }
+                                               else {
+                                                       Integer[] lower = 
dist.boundaries[pIdx - 1];
+                                                       if 
((s.f0.compareTo(lower[0]) <= 0)) {
+                                                               
correctlyPartitioned = false;
+                                                       }
+                                               }
+
+                                               if (!correctlyPartitioned) {
+                                                       fail("Record was not 
correctly partitioned: " + s.toString());
+                                               }
+                                       }
+                               }
+                       }
+                       );
+
+               result.output(new DiscardingOutputFormat<Boolean>());
+               env.execute();
+       }
+
+       /**
+        * Test the record partitioned rightly with two fields according to the 
customized data distribution.
+        */
+       @Test
+       public void testRangeWithDistribution2() throws Exception {
+               final TestDataDist2 dist = new TestDataDist2();
+
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               env.setParallelism(dist.getParallelism());
+
+               DataSet<Tuple3<Integer, Integer, String>> input = 
env.fromElements(
+                                               new Tuple3<>(1, 5, "Hi"),
+                                               new Tuple3<>(1, 6, "Hi"),
+                                               new Tuple3<>(1, 7, "Hi"),
+                                               new Tuple3<>(1, 11, "Hello"),
+                                               new Tuple3<>(2, 3, "World"),
+                                               new Tuple3<>(2, 4, "World"),
+                                               new Tuple3<>(2, 5, "World"),
+                                               new Tuple3<>(2, 13, "Hello 
World"),
+                                               new Tuple3<>(3, 8, "Say"),
+                                               new Tuple3<>(4, 0, "Why"),
+                                               new Tuple3<>(4, 2, "Java"),
+                                               new Tuple3<>(4, 11, "Say 
Hello"),
+                                               new Tuple3<>(5, 1, "Hi Java!"),
+                                               new Tuple3<>(5, 2, "Hi Java?"),
+                                               new Tuple3<>(5, 3, "Hi Java 
again")
+                       );
+
+               DataSet<Boolean> result = DataSetUtils
+                       .partitionByRange(input, dist, 0, 1)
+                       .mapPartition(new 
RichMapPartitionFunction<Tuple3<Integer, Integer, String>, Boolean>() {
+
+                               @Override
+                               public void 
mapPartition(Iterable<Tuple3<Integer, Integer, String>> values, 
Collector<Boolean> out) throws Exception {
+                                       int pIdx = 
getRuntimeContext().getIndexOfThisSubtask();
+                                       boolean correctlyPartitioned = true;
+
+                                       for (Tuple3<Integer, Integer, String> s 
: values) {
+
+                                               if (pIdx == 0) {
+                                                       Integer[] upper = 
dist.boundaries[0];
+                                                       if 
(s.f0.compareTo(upper[0]) > 0 ||
+                                                               
(s.f0.compareTo(upper[0]) == 0 && s.f1.compareTo(upper[1]) > 0)) {
+                                                               
correctlyPartitioned = false;
+                                                       }
+                                               }
+                                               else if (pIdx > 0 && pIdx < 
dist.getParallelism() - 1) {
+                                                       Integer[] lower = 
dist.boundaries[pIdx - 1];
+                                                       Integer[] upper = 
dist.boundaries[pIdx];
+
+                                                       if 
(s.f0.compareTo(upper[0]) > 0 ||
+                                                               
(s.f0.compareTo(upper[0]) == 0 && s.f1.compareTo(upper[1]) > 0) ||
+                                                               
(s.f0.compareTo(lower[0]) < 0) ||
+                                                               
(s.f0.compareTo(lower[0]) == 0 && s.f1.compareTo(lower[1]) <= 0)) {
+                                                               
correctlyPartitioned = false;
+                                                       }
+                                               }
+                                               else {
+                                                       Integer[] lower = 
dist.boundaries[pIdx - 1];
+                                                       if 
((s.f0.compareTo(lower[0]) < 0) ||
+                                                               
(s.f0.compareTo(lower[0]) == 0 && s.f1.compareTo(lower[1]) <= 0)) {
+                                                               
correctlyPartitioned = false;
+                                                       }
+                                               }
+
+                                               if (!correctlyPartitioned) {
+                                                       fail("Record was not 
correctly partitioned: " + s.toString());
+                                               }
+                                       }
+                               }
+                       }
+                       );
+
+               result.output(new DiscardingOutputFormat<Boolean>());
+               env.execute();
+       }
+
+       /*
+        * Test the number of partition keys less than the number of 
distribution fields
+        */
+       @Test
+       public void testPartitionKeyLessDistribution() throws Exception {
+               final TestDataDist2 dist = new TestDataDist2();
+
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               env.setParallelism(dist.getParallelism());
+
+               DataSet<Tuple3<Integer, Long, String>> input = 
CollectionDataSets.get3TupleDataSet(env);
+
+               DataSet<Boolean> result = DataSetUtils
+                       .partitionByRange(input, dist, 0)
+                       .mapPartition(new 
RichMapPartitionFunction<Tuple3<Integer, Long, String>, Boolean>() {
+
+                               @Override
+                               public void 
mapPartition(Iterable<Tuple3<Integer, Long, String>> values, Collector<Boolean> 
out) throws Exception {
+                                       int pIdx = 
getRuntimeContext().getIndexOfThisSubtask();
+
+                                       for (Tuple3<Integer, Long, String> s : 
values) {
+                                               boolean correctlyPartitioned = 
true;
+                                               if (pIdx == 0) {
+                                                       Integer[] upper = 
dist.boundaries[0];
+                                                       if 
(s.f0.compareTo(upper[0]) > 0) {
+                                                               
correctlyPartitioned = false;
+                                                       }
+                                               }
+                                               else if (pIdx > 0 && pIdx < 
dist.getParallelism() - 1) {
+                                                       Integer[] lower = 
dist.boundaries[pIdx - 1];
+                                                       Integer[] upper = 
dist.boundaries[pIdx];
+                                                       if 
(s.f0.compareTo(upper[0]) > 0 || (s.f0.compareTo(lower[0]) <= 0)) {
+                                                               
correctlyPartitioned = false;
+                                                       }
+                                               }
+                                               else {
+                                                       Integer[] lower = 
dist.boundaries[pIdx - 1];
+                                                       if 
((s.f0.compareTo(lower[0]) <= 0)) {
+                                                               
correctlyPartitioned = false;
+                                                       }
+                                               }
+
+                                               if (!correctlyPartitioned) {
+                                                       fail("Record was not 
correctly partitioned: " + s.toString());
+                                               }
+                                       }
+                               }
+                       }
+                       );
+
+               result.output(new DiscardingOutputFormat<Boolean>());
+               env.execute();
+       }
+
+       /*
+        * Test the number of partition keys larger than the number of 
distribution fields
+        */
+       @Test(expected = IllegalArgumentException.class)
+       public void testPartitionMoreThanDistribution() throws Exception {
+               final TestDataDist2 dist = new TestDataDist2();
+
+               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               DataSet<Tuple3<Integer, Long, String>> input = 
CollectionDataSets.get3TupleDataSet(env);
+               DataSetUtils.partitionByRange(input, dist, 0, 1, 2);
+       }
+
+       /**
+        * The class is used to do the tests of range partition with one key.
+        */
+       public static class TestDataDist1 implements DataDistribution {
+
+               public Integer[][] boundaries = new Integer[][]{
+                       new Integer[]{4},
+                       new Integer[]{9},
+                       new Integer[]{13},
+                       new Integer[]{18}
+               };
+
+               public TestDataDist1() {}
+
+               public int getParallelism() {
+                       return boundaries.length;
+               }
+
+               @Override
+               public Object[] getBucketBoundary(int bucketNum, int 
totalNumBuckets) {
+                       return boundaries[bucketNum];
+               }
+
+               @Override
+               public int getNumberOfFields() {
+                       return 1;
+               }
+
+               @Override
+               public TypeInformation[] getKeyTypes() {
+                       return new 
TypeInformation[]{BasicTypeInfo.INT_TYPE_INFO};
+               }
+
+               @Override
+               public void write(DataOutputView out) throws IOException {}
+
+               @Override
+               public void read(DataInputView in) throws IOException {}
+       }
+
+       /**
+        * The class is used to do the tests of range partition with two keys.
+        */
+       public static class TestDataDist2 implements DataDistribution {
+
+               public Integer[][] boundaries = new Integer[][]{
+                       new Integer[]{1, 6},
+                       new Integer[]{2, 4},
+                       new Integer[]{3, 9},
+                       new Integer[]{4, 1},
+                       new Integer[]{5, 2}
+               };
+
+               public TestDataDist2() {}
+
+               public int getParallelism() {
+                       return boundaries.length;
+               }
+
+               @Override
+               public Object[] getBucketBoundary(int bucketNum, int 
totalNumBuckets) {
+                       return boundaries[bucketNum];
+               }
+
+               @Override
+               public int getNumberOfFields() {
+                       return 2;
+               }
+
+               @Override
+               public TypeInformation[] getKeyTypes() {
+                       return new 
TypeInformation[]{BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO};
+               }
+
+               @Override
+               public void write(DataOutputView out) throws IOException {}
+
+               @Override
+               public void read(DataInputView in) throws IOException {}
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/operators/DataSinkITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/operators/DataSinkITCase.java 
b/flink-tests/src/test/java/org/apache/flink/test/operators/DataSinkITCase.java
new file mode 100644
index 0000000..deb5170
--- /dev/null
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/operators/DataSinkITCase.java
@@ -0,0 +1,356 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.operators;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.operators.Order;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.test.operators.util.CollectionDataSets;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.io.BufferedReader;
+import java.util.Random;
+
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests for data sinks.
+ */
+@SuppressWarnings("serial")
+@RunWith(Parameterized.class)
+public class DataSinkITCase extends MultipleProgramsTestBase {
+
+       public DataSinkITCase(TestExecutionMode mode) {
+               super(mode);
+       }
+
+       private String resultPath;
+
+       @Rule
+       public TemporaryFolder tempFolder = new TemporaryFolder();
+
+       @Before
+       public void before() throws Exception{
+               resultPath = tempFolder.newFile().toURI().toString();
+       }
+
+       @Test
+       public void testIntSortingParallelism1() throws Exception {
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               DataSet<Integer> ds = CollectionDataSets.getIntegerDataSet(env);
+               ds.writeAsText(resultPath).sortLocalOutput("*", 
Order.DESCENDING).setParallelism(1);
+
+               env.execute();
+
+               String expected = 
"5\n5\n5\n5\n5\n4\n4\n4\n4\n3\n3\n3\n2\n2\n1\n";
+               compareResultsByLinesInMemoryWithStrictOrder(expected, 
resultPath);
+
+       }
+
+       @Test
+       public void testStringSortingParallelism1() throws Exception {
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               DataSet<String> ds = CollectionDataSets.getStringDataSet(env);
+               ds.writeAsText(resultPath).sortLocalOutput("*", 
Order.ASCENDING).setParallelism(1);
+
+               env.execute();
+
+               String expected = "Hello\n" +
+                               "Hello world\n" +
+                               "Hello world, how are you?\n" +
+                               "Hi\n" +
+                               "I am fine.\n" +
+                               "LOL\n" +
+                               "Luke Skywalker\n" +
+                               "Random comment\n";
+
+               compareResultsByLinesInMemoryWithStrictOrder(expected, 
resultPath);
+       }
+
+       @Test
+       public void testTupleSortingSingleAscParallelism1() throws Exception {
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               DataSet<Tuple3<Integer, Long, String>> ds = 
CollectionDataSets.get3TupleDataSet(env);
+               ds.writeAsCsv(resultPath).sortLocalOutput(0, 
Order.ASCENDING).setParallelism(1);
+
+               env.execute();
+
+               String expected = "1,1,Hi\n" +
+                               "2,2,Hello\n" +
+                               "3,2,Hello world\n" +
+                               "4,3,Hello world, how are you?\n" +
+                               "5,3,I am fine.\n" +
+                               "6,3,Luke Skywalker\n" +
+                               "7,4,Comment#1\n" +
+                               "8,4,Comment#2\n" +
+                               "9,4,Comment#3\n" +
+                               "10,4,Comment#4\n" +
+                               "11,5,Comment#5\n" +
+                               "12,5,Comment#6\n" +
+                               "13,5,Comment#7\n" +
+                               "14,5,Comment#8\n" +
+                               "15,5,Comment#9\n" +
+                               "16,6,Comment#10\n" +
+                               "17,6,Comment#11\n" +
+                               "18,6,Comment#12\n" +
+                               "19,6,Comment#13\n" +
+                               "20,6,Comment#14\n" +
+                               "21,6,Comment#15\n";
+
+               compareResultsByLinesInMemoryWithStrictOrder(expected, 
resultPath);
+       }
+
+       @Test
+       public void testTupleSortingSingleDescParallelism1() throws Exception {
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               DataSet<Tuple3<Integer, Long, String>> ds = 
CollectionDataSets.get3TupleDataSet(env);
+               ds.writeAsCsv(resultPath).sortLocalOutput(0, 
Order.DESCENDING).setParallelism(1);
+
+               env.execute();
+
+               String expected = "21,6,Comment#15\n" +
+                               "20,6,Comment#14\n" +
+                               "19,6,Comment#13\n" +
+                               "18,6,Comment#12\n" +
+                               "17,6,Comment#11\n" +
+                               "16,6,Comment#10\n" +
+                               "15,5,Comment#9\n" +
+                               "14,5,Comment#8\n" +
+                               "13,5,Comment#7\n" +
+                               "12,5,Comment#6\n" +
+                               "11,5,Comment#5\n" +
+                               "10,4,Comment#4\n" +
+                               "9,4,Comment#3\n" +
+                               "8,4,Comment#2\n" +
+                               "7,4,Comment#1\n" +
+                               "6,3,Luke Skywalker\n" +
+                               "5,3,I am fine.\n" +
+                               "4,3,Hello world, how are you?\n" +
+                               "3,2,Hello world\n" +
+                               "2,2,Hello\n" +
+                               "1,1,Hi\n";
+
+               compareResultsByLinesInMemoryWithStrictOrder(expected, 
resultPath);
+       }
+
+       @Test
+       public void testTupleSortingDualParallelism1() throws Exception {
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               DataSet<Tuple3<Integer, Long, String>> ds = 
CollectionDataSets.get3TupleDataSet(env);
+               ds.writeAsCsv(resultPath)
+                       .sortLocalOutput(1, 
Order.DESCENDING).sortLocalOutput(0, Order.ASCENDING)
+                       .setParallelism(1);
+
+               env.execute();
+
+               String expected = "16,6,Comment#10\n" +
+                               "17,6,Comment#11\n" +
+                               "18,6,Comment#12\n" +
+                               "19,6,Comment#13\n" +
+                               "20,6,Comment#14\n" +
+                               "21,6,Comment#15\n" +
+                               "11,5,Comment#5\n" +
+                               "12,5,Comment#6\n" +
+                               "13,5,Comment#7\n" +
+                               "14,5,Comment#8\n" +
+                               "15,5,Comment#9\n" +
+                               "7,4,Comment#1\n" +
+                               "8,4,Comment#2\n" +
+                               "9,4,Comment#3\n" +
+                               "10,4,Comment#4\n" +
+                               "4,3,Hello world, how are you?\n" +
+                               "5,3,I am fine.\n" +
+                               "6,3,Luke Skywalker\n" +
+                               "2,2,Hello\n" +
+                               "3,2,Hello world\n" +
+                               "1,1,Hi\n";
+
+               compareResultsByLinesInMemoryWithStrictOrder(expected, 
resultPath);
+       }
+
+       @Test
+       public void testTupleSortingNestedParallelism1() throws Exception {
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               DataSet<Tuple3<Tuple2<Integer, Integer>, String, Integer>> ds =
+                               
CollectionDataSets.getGroupSortedNestedTupleDataSet2(env);
+               ds.writeAsText(resultPath)
+                       .sortLocalOutput("f0.f1", Order.ASCENDING)
+                       .sortLocalOutput("f1", Order.DESCENDING)
+                       .setParallelism(1);
+
+               env.execute();
+
+               String expected =
+                               "((2,1),a,3)\n" +
+                               "((2,2),b,4)\n" +
+                               "((1,2),a,1)\n" +
+                               "((3,3),c,5)\n" +
+                               "((1,3),a,2)\n" +
+                               "((3,6),c,6)\n" +
+                               "((4,9),c,7)\n";
+
+               compareResultsByLinesInMemoryWithStrictOrder(expected, 
resultPath);
+       }
+
+       @Test
+       public void testTupleSortingNestedParallelism1_2() throws Exception {
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               DataSet<Tuple3<Tuple2<Integer, Integer>, String, Integer>> ds =
+                               
CollectionDataSets.getGroupSortedNestedTupleDataSet2(env);
+               ds.writeAsText(resultPath)
+                       .sortLocalOutput(1, Order.ASCENDING)
+                       .sortLocalOutput(2, Order.DESCENDING)
+                       .setParallelism(1);
+
+               env.execute();
+
+               String expected =
+                               "((2,1),a,3)\n" +
+                               "((1,3),a,2)\n" +
+                               "((1,2),a,1)\n" +
+                               "((2,2),b,4)\n" +
+                               "((4,9),c,7)\n" +
+                               "((3,6),c,6)\n" +
+                               "((3,3),c,5)\n";
+
+               compareResultsByLinesInMemoryWithStrictOrder(expected, 
resultPath);
+       }
+
+       @Test
+       public void testPojoSortingSingleParallelism1() throws Exception {
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               DataSet<CollectionDataSets.POJO> ds = 
CollectionDataSets.getMixedPojoDataSet(env);
+               ds.writeAsText(resultPath).sortLocalOutput("number", 
Order.ASCENDING).setParallelism(1);
+
+               env.execute();
+
+               String expected = "1 First (10,100,1000,One) 10100\n" +
+                               "2 First_ (10,105,1000,One) 10200\n" +
+                               "3 First (11,102,3000,One) 10200\n" +
+                               "4 First_ (11,106,1000,One) 10300\n" +
+                               "5 First (11,102,2000,One) 10100\n" +
+                               "6 Second_ (20,200,2000,Two) 10100\n" +
+                               "7 Third (31,301,2000,Three) 10200\n" +
+                               "8 Third_ (30,300,1000,Three) 10100\n";
+
+               compareResultsByLinesInMemoryWithStrictOrder(expected, 
resultPath);
+       }
+
+       @Test
+       public void testPojoSortingDualParallelism1() throws Exception {
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               DataSet<CollectionDataSets.POJO> ds = 
CollectionDataSets.getMixedPojoDataSet(env);
+               ds.writeAsText(resultPath)
+                       .sortLocalOutput("str", Order.ASCENDING)
+                       .sortLocalOutput("number", Order.DESCENDING)
+                       .setParallelism(1);
+
+               env.execute();
+
+               String expected =
+                               "5 First (11,102,2000,One) 10100\n" +
+                               "3 First (11,102,3000,One) 10200\n" +
+                               "1 First (10,100,1000,One) 10100\n" +
+                               "4 First_ (11,106,1000,One) 10300\n" +
+                               "2 First_ (10,105,1000,One) 10200\n" +
+                               "6 Second_ (20,200,2000,Two) 10100\n" +
+                               "7 Third (31,301,2000,Three) 10200\n" +
+                               "8 Third_ (30,300,1000,Three) 10100\n";
+
+               compareResultsByLinesInMemoryWithStrictOrder(expected, 
resultPath);
+
+       }
+
+       @Test
+       public void testPojoSortingNestedParallelism1() throws Exception {
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               DataSet<CollectionDataSets.POJO> ds = 
CollectionDataSets.getMixedPojoDataSet(env);
+               ds.writeAsText(resultPath)
+                       .sortLocalOutput("nestedTupleWithCustom.f0", 
Order.ASCENDING)
+                       .sortLocalOutput("nestedTupleWithCustom.f1.myInt", 
Order.DESCENDING)
+                       .sortLocalOutput("nestedPojo.longNumber", 
Order.ASCENDING)
+                       .setParallelism(1);
+
+               env.execute();
+
+               String expected =
+                               "2 First_ (10,105,1000,One) 10200\n" +
+                               "1 First (10,100,1000,One) 10100\n" +
+                               "4 First_ (11,106,1000,One) 10300\n" +
+                               "5 First (11,102,2000,One) 10100\n" +
+                               "3 First (11,102,3000,One) 10200\n" +
+                               "6 Second_ (20,200,2000,Two) 10100\n" +
+                               "8 Third_ (30,300,1000,Three) 10100\n" +
+                               "7 Third (31,301,2000,Three) 10200\n";
+
+               compareResultsByLinesInMemoryWithStrictOrder(expected, 
resultPath);
+       }
+
+       @Test
+       public void testSortingParallelism4() throws Exception {
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               DataSet<Long> ds = env.generateSequence(0, 1000);
+               // randomize
+               ds.map(new MapFunction<Long, Long>() {
+
+                       Random rand = new Random(1234L);
+                       @Override
+                       public Long map(Long value) throws Exception {
+                               return rand.nextLong();
+                       }
+               }).writeAsText(resultPath)
+                       .sortLocalOutput("*", Order.ASCENDING)
+                       .setParallelism(4);
+
+               env.execute();
+
+               BufferedReader[] resReaders = getResultReader(resultPath);
+               for (BufferedReader br : resReaders) {
+                       long cmp = Long.MIN_VALUE;
+                       while (br.ready()) {
+                               long cur = Long.parseLong(br.readLine());
+                               assertTrue("Invalid order of sorted output", 
cmp <= cur);
+                               cmp = cur;
+                       }
+                       br.close();
+               }
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/operators/DataSourceITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/operators/DataSourceITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/operators/DataSourceITCase.java
new file mode 100644
index 0000000..f1a3f08
--- /dev/null
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/operators/DataSourceITCase.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.operators;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.TextInputFormat;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.test.util.JavaProgramTestBase;
+
+import org.junit.Assert;
+
+import java.util.List;
+
+/**
+ * Tests for the DataSource.
+ */
+public class DataSourceITCase extends JavaProgramTestBase {
+
+       private String inputPath;
+
+       @Override
+       protected void preSubmit() throws Exception {
+               inputPath = createTempFile("input", "ab\n"
+                               + "cd\n"
+                               + "ef\n");
+       }
+
+       @Override
+       protected void testProgram() throws Exception {
+               /*
+                * Test passing a configuration object to an input format
+                */
+
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               Configuration ifConf = new Configuration();
+               ifConf.setString("prepend", "test");
+
+               DataSet<String> ds = env.createInput(new TestInputFormat(new 
Path(inputPath))).withParameters(ifConf);
+               List<String> result = ds.collect();
+
+               String expectedResult = "ab\n"
+                               + "cd\n"
+                               + "ef\n";
+
+               compareResultAsText(result, expectedResult);
+       }
+
+       private static class TestInputFormat extends TextInputFormat {
+               private static final long serialVersionUID = 1L;
+
+               public TestInputFormat(Path filePath) {
+                       super(filePath);
+               }
+
+               @Override
+               public void configure(Configuration parameters) {
+                       super.configure(parameters);
+
+                       Assert.assertNotNull(parameters.getString("prepend", 
null));
+                       Assert.assertEquals("test", 
parameters.getString("prepend", null));
+               }
+
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/operators/DistinctITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/operators/DistinctITCase.java 
b/flink-tests/src/test/java/org/apache/flink/test/operators/DistinctITCase.java
new file mode 100644
index 0000000..1215660
--- /dev/null
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/operators/DistinctITCase.java
@@ -0,0 +1,322 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.operators;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.tuple.Tuple5;
+import org.apache.flink.test.operators.util.CollectionDataSets;
+import org.apache.flink.test.operators.util.CollectionDataSets.CustomType;
+import org.apache.flink.test.operators.util.CollectionDataSets.POJO;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.List;
+
+/**
+ * Integration tests for {@link DataSet#distinct}.
+ */
+@SuppressWarnings("serial")
+@RunWith(Parameterized.class)
+public class DistinctITCase extends MultipleProgramsTestBase {
+
+       public DistinctITCase(TestExecutionMode mode){
+               super(mode);
+       }
+
+       @Test
+       public void testCorrectnessOfDistinctOnTuplesWithKeyFieldSelector() 
throws Exception {
+               /*
+                * check correctness of distinct on tuples with key field 
selector
+                */
+
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               DataSet<Tuple3<Integer, Long, String>> ds = 
CollectionDataSets.getSmall3TupleDataSet(env);
+               DataSet<Tuple3<Integer, Long, String>> distinctDs = 
ds.union(ds).distinct(0, 1, 2);
+
+               List<Tuple3<Integer, Long, String>> result = 
distinctDs.collect();
+
+               String expected = "1,1,Hi\n" +
+                               "2,2,Hello\n" +
+                               "3,2,Hello world\n";
+
+               compareResultAsTuples(result, expected);
+       }
+
+       @Test
+       public void 
testCorrectnessOfDistinctOnTuplesWithKeyFieldSelectorWithNotAllFieldsSelected() 
throws Exception{
+               /*
+                * check correctness of distinct on tuples with key field 
selector with not all fields selected
+                */
+
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               DataSet<Tuple5<Integer, Long,  Integer, String, Long>> ds = 
CollectionDataSets.getSmall5TupleDataSet(env);
+               DataSet<Tuple1<Integer>> distinctDs = 
ds.union(ds).distinct(0).project(0);
+
+               List<Tuple1<Integer>> result = distinctDs.collect();
+
+               String expected = "1\n" +
+                               "2\n";
+
+               compareResultAsTuples(result, expected);
+       }
+
+       @Test
+       public void testCorrectnessOfDistinctOnTuplesWithKeyExtractorFunction() 
throws Exception {
+               /*
+                * check correctness of distinct on tuples with key extractor 
function
+                */
+
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               DataSet<Tuple5<Integer, Long,  Integer, String, Long>> ds = 
CollectionDataSets.getSmall5TupleDataSet(env);
+               DataSet<Tuple1<Integer>> reduceDs = ds.union(ds).distinct(new 
KeySelector1()).project(0);
+
+               List<Tuple1<Integer>> result = reduceDs.collect();
+
+               String expected = "1\n" + "2\n";
+
+               compareResultAsTuples(result, expected);
+       }
+
+       private static class KeySelector1 implements 
KeySelector<Tuple5<Integer, Long,  Integer, String, Long>, Integer> {
+               private static final long serialVersionUID = 1L;
+               @Override
+               public Integer getKey(Tuple5<Integer, Long,  Integer, String, 
Long> in) {
+                       return in.f0;
+               }
+       }
+
+       @Test
+       public void testCorrectnessOfDistinctOnCustomTypeWithTypeExtractor() 
throws Exception {
+               /*
+                * check correctness of distinct on custom type with type 
extractor
+                */
+
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               DataSet<CustomType> ds = 
CollectionDataSets.getCustomTypeDataSet(env);
+               DataSet<Tuple1<Integer>> reduceDs = ds
+                               .distinct(new KeySelector3())
+                               .map(new Mapper3());
+
+               List<Tuple1<Integer>> result = reduceDs.collect();
+
+               String expected = "1\n" +
+                               "2\n" +
+                               "3\n" +
+                               "4\n" +
+                               "5\n" +
+                               "6\n";
+
+               compareResultAsTuples(result, expected);
+       }
+
+       private static class Mapper3 extends RichMapFunction<CustomType, 
Tuple1<Integer>> {
+               @Override
+               public Tuple1<Integer> map(CustomType value) throws Exception {
+                       return new Tuple1<Integer>(value.myInt);
+               }
+       }
+
+       private static class KeySelector3 implements KeySelector<CustomType, 
Integer> {
+               private static final long serialVersionUID = 1L;
+               @Override
+               public Integer getKey(CustomType in) {
+                       return in.myInt;
+               }
+       }
+
+       @Test
+       public void testCorrectnessOfDistinctOnTuples() throws Exception{
+               /*
+                * check correctness of distinct on tuples
+                */
+
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               DataSet<Tuple3<Integer, Long, String>> ds = 
CollectionDataSets.getSmall3TupleDataSet(env);
+               DataSet<Tuple3<Integer, Long, String>> distinctDs = 
ds.union(ds).distinct();
+
+               List<Tuple3<Integer, Long, String>> result = 
distinctDs.collect();
+
+               String expected = "1,1,Hi\n" +
+                               "2,2,Hello\n" +
+                               "3,2,Hello world\n";
+
+               compareResultAsTuples(result, expected);
+       }
+
+       @Test
+       public void 
testCorrectnessOfDistinctOnCustomTypeWithTupleReturningTypeExtractor() throws 
Exception{
+               /*
+                * check correctness of distinct on custom type with 
tuple-returning type extractor
+                */
+
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds = 
CollectionDataSets.get5TupleDataSet(env);
+               DataSet<Tuple2<Integer, Long>> reduceDs = ds
+                               .distinct(new KeySelector2())
+                               .project(0, 4);
+
+               List<Tuple2<Integer, Long>> result = reduceDs.collect();
+
+               String expected = "1,1\n" +
+                               "2,1\n" +
+                               "2,2\n" +
+                               "3,2\n" +
+                               "3,3\n" +
+                               "4,1\n" +
+                               "4,2\n" +
+                               "5,1\n" +
+                               "5,2\n" +
+                               "5,3\n";
+
+               compareResultAsTuples(result, expected);
+       }
+
+       private static class KeySelector2 implements 
KeySelector<Tuple5<Integer, Long, Integer, String, Long>, Tuple2<Integer, 
Long>> {
+               private static final long serialVersionUID = 1L;
+               @Override
+               public Tuple2<Integer, Long> getKey(Tuple5<Integer, Long, 
Integer, String, Long> t) {
+                       return new Tuple2<Integer, Long>(t.f0, t.f4);
+               }
+       }
+
+       @Test
+       public void testCorrectnessOfDistinctOnTuplesWithFieldExpressions() 
throws Exception {
+               /*
+                * check correctness of distinct on tuples with field 
expressions
+                */
+
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               DataSet<Tuple5<Integer, Long,  Integer, String, Long>> ds = 
CollectionDataSets.getSmall5TupleDataSet(env);
+               DataSet<Tuple1<Integer>> reduceDs = 
ds.union(ds).distinct("f0").project(0);
+
+               List<Tuple1<Integer>> result = reduceDs.collect();
+
+               String expected = "1\n" +
+                               "2\n";
+
+               compareResultAsTuples(result, expected);
+       }
+
+       @Test
+       public void testCorrectnessOfDistinctOnPojos() throws Exception {
+               /*
+                * check correctness of distinct on Pojos
+                */
+
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               DataSet<POJO> ds = 
CollectionDataSets.getDuplicatePojoDataSet(env);
+               DataSet<Integer> reduceDs = 
ds.distinct("nestedPojo.longNumber").map(new Mapper2());
+
+               List<Integer> result = reduceDs.collect();
+
+               String expected = "10000\n20000\n30000\n";
+
+               compareResultAsText(result, expected);
+       }
+
+       private static class Mapper2 implements 
MapFunction<CollectionDataSets.POJO, Integer> {
+               @Override
+               public Integer map(POJO value) throws Exception {
+                       return (int) value.nestedPojo.longNumber;
+               }
+       }
+
+       @Test
+       public void testDistinctOnFullPojo() throws Exception {
+               /*
+                * distinct on full Pojo
+                */
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               DataSet<POJO> ds = 
CollectionDataSets.getDuplicatePojoDataSet(env);
+               DataSet<Integer> reduceDs = ds.distinct().map(new Mapper1());
+
+               List<Integer> result = reduceDs.collect();
+
+               String expected = "10000\n20000\n30000\n";
+
+               compareResultAsText(result, expected);
+       }
+
+       private static class Mapper1 implements 
MapFunction<CollectionDataSets.POJO, Integer> {
+               @Override
+               public Integer map(POJO value) throws Exception {
+                       return (int) value.nestedPojo.longNumber;
+               }
+       }
+
+       @Test
+       public void testCorrectnessOfDistinctOnAtomic() throws Exception {
+               /*
+                * check correctness of distinct on Integers
+                */
+
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               DataSet<Integer> ds = CollectionDataSets.getIntegerDataSet(env);
+               DataSet<Integer> reduceDs = ds.distinct();
+
+               List<Integer> result = reduceDs.collect();
+
+               String expected = "1\n2\n3\n4\n5";
+
+               compareResultAsText(result, expected);
+       }
+
+       @Test
+       public void testCorrectnessOfDistinctOnAtomicWithSelectAllChar() throws 
Exception {
+               /*
+                * check correctness of distinct on Strings, using 
Keys.ExpressionKeys.SELECT_ALL_CHAR
+                */
+
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               DataSet<String> ds = CollectionDataSets.getStringDataSet(env);
+               DataSet<String> reduceDs = ds.union(ds).distinct("*");
+
+               List<String> result = reduceDs.collect();
+
+               String expected = "I am fine.\n" +
+                               "Luke Skywalker\n" +
+                               "LOL\n" +
+                               "Hello world, how are you?\n" +
+                               "Hi\n" +
+                               "Hello world\n" +
+                               "Hello\n" +
+                               "Random comment\n";
+
+               compareResultAsText(result, expected);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/operators/ExecutionEnvironmentITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/operators/ExecutionEnvironmentITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/operators/ExecutionEnvironmentITCase.java
new file mode 100644
index 0000000..f7f993b
--- /dev/null
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/operators/ExecutionEnvironmentITCase.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.operators;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.functions.RichMapPartitionFunction;
+import org.apache.flink.api.common.io.GenericInputFormat;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.io.GenericInputSplit;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Test ExecutionEnvironment from user perspective.
+ */
+@SuppressWarnings("serial")
+public class ExecutionEnvironmentITCase extends TestLogger {
+
+       private static final int PARALLELISM = 5;
+
+       /**
+        * Ensure that the user can pass a custom configuration object to the 
LocalEnvironment.
+        */
+       @Test
+       public void testLocalEnvironmentWithConfig() throws Exception {
+               Configuration conf = new Configuration();
+               conf.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 
PARALLELISM);
+
+               final ExecutionEnvironment env = 
ExecutionEnvironment.createLocalEnvironment(conf);
+               env.setParallelism(ExecutionConfig.PARALLELISM_AUTO_MAX);
+               env.getConfig().disableSysoutLogging();
+
+               DataSet<Integer> result = env.createInput(new 
ParallelismDependentInputFormat())
+                               .rebalance()
+                               .mapPartition(new 
RichMapPartitionFunction<Integer, Integer>() {
+                                       @Override
+                                       public void 
mapPartition(Iterable<Integer> values, Collector<Integer> out) throws Exception 
{
+                                               
out.collect(getRuntimeContext().getIndexOfThisSubtask());
+                                       }
+                               });
+               List<Integer> resultCollection = result.collect();
+               assertEquals(PARALLELISM, resultCollection.size());
+       }
+
+       private static class ParallelismDependentInputFormat extends 
GenericInputFormat<Integer> {
+
+               private transient boolean emitted;
+
+               @Override
+               public GenericInputSplit[] createInputSplits(int numSplits) 
throws IOException {
+                       assertEquals(PARALLELISM, numSplits);
+                       return super.createInputSplits(numSplits);
+               }
+
+               @Override
+               public boolean reachedEnd() {
+                       return emitted;
+               }
+
+               @Override
+               public Integer nextRecord(Integer reuse) {
+                       if (emitted) {
+                               return null;
+                       }
+                       emitted = true;
+                       return 1;
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/operators/FilterITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/operators/FilterITCase.java 
b/flink-tests/src/test/java/org/apache/flink/test/operators/FilterITCase.java
new file mode 100644
index 0000000..6fa2dce
--- /dev/null
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/operators/FilterITCase.java
@@ -0,0 +1,331 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.operators;
+
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.common.functions.RichFilterFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.test.operators.util.CollectionDataSets;
+import org.apache.flink.test.operators.util.CollectionDataSets.CustomType;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.Collection;
+import java.util.List;
+
+/**
+ * Integration tests for {@link FilterFunction} and {@link RichFilterFunction}.
+ */
+@RunWith(Parameterized.class)
+public class FilterITCase extends MultipleProgramsTestBase {
+       public FilterITCase(TestExecutionMode mode){
+               super(mode);
+       }
+
+       @Test
+       public void testAllRejectingFilter() throws Exception {
+               /*
+                * Test all-rejecting filter.
+                */
+
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               DataSet<Tuple3<Integer, Long, String>> ds = 
CollectionDataSets.get3TupleDataSet(env);
+               DataSet<Tuple3<Integer, Long, String>> filterDs = ds.
+                               filter(new Filter1());
+
+               List<Tuple3<Integer, Long, String>> result = filterDs.collect();
+
+               String expected = "\n";
+
+               compareResultAsTuples(result, expected);
+       }
+
+       private static class Filter1 implements FilterFunction<Tuple3<Integer, 
Long, String>> {
+               private static final long serialVersionUID = 1L;
+
+               @Override
+               public boolean filter(Tuple3<Integer, Long, String> value) 
throws Exception {
+                       return false;
+               }
+       }
+
+       @Test
+       public void testAllPassingFilter() throws Exception {
+               /*
+                * Test all-passing filter.
+                */
+
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               DataSet<Tuple3<Integer, Long, String>> ds = 
CollectionDataSets.get3TupleDataSet(env);
+               DataSet<Tuple3<Integer, Long, String>> filterDs = ds.
+                               filter(new Filter2());
+               List<Tuple3<Integer, Long, String>> result = filterDs.collect();
+
+               String expected = "1,1,Hi\n" +
+                               "2,2,Hello\n" +
+                               "3,2,Hello world\n" +
+                               "4,3,Hello world, how are you?\n" +
+                               "5,3,I am fine.\n" +
+                               "6,3,Luke Skywalker\n" +
+                               "7,4,Comment#1\n" +
+                               "8,4,Comment#2\n" +
+                               "9,4,Comment#3\n" +
+                               "10,4,Comment#4\n" +
+                               "11,5,Comment#5\n" +
+                               "12,5,Comment#6\n" +
+                               "13,5,Comment#7\n" +
+                               "14,5,Comment#8\n" +
+                               "15,5,Comment#9\n" +
+                               "16,6,Comment#10\n" +
+                               "17,6,Comment#11\n" +
+                               "18,6,Comment#12\n" +
+                               "19,6,Comment#13\n" +
+                               "20,6,Comment#14\n" +
+                               "21,6,Comment#15\n";
+
+               compareResultAsTuples(result, expected);
+       }
+
+       private static class Filter2 implements FilterFunction<Tuple3<Integer, 
Long, String>> {
+               private static final long serialVersionUID = 1L;
+
+               @Override
+               public boolean filter(Tuple3<Integer, Long, String> value) 
throws Exception {
+                       return true;
+               }
+       }
+
+       @Test
+       public void testFilterOnStringTupleField() throws Exception {
+               /*
+                * Test filter on String tuple field.
+                */
+
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               DataSet<Tuple3<Integer, Long, String>> ds = 
CollectionDataSets.get3TupleDataSet(env);
+               DataSet<Tuple3<Integer, Long, String>> filterDs = ds.
+                               filter(new Filter3());
+               List<Tuple3<Integer, Long, String>> result = filterDs.collect();
+
+               String expected = "3,2,Hello world\n"
+                               +
+                               "4,3,Hello world, how are you?\n";
+
+               compareResultAsTuples(result, expected);
+
+       }
+
+       private static class Filter3 implements FilterFunction<Tuple3<Integer, 
Long, String>> {
+               private static final long serialVersionUID = 1L;
+
+               @Override
+               public boolean filter(Tuple3<Integer, Long, String> value) 
throws Exception {
+                       return value.f2.contains("world");
+               }
+       }
+
+       @Test
+       public void testFilterOnIntegerTupleField() throws Exception {
+               /*
+                * Test filter on Integer tuple field.
+                */
+
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               DataSet<Tuple3<Integer, Long, String>> ds = 
CollectionDataSets.get3TupleDataSet(env);
+               DataSet<Tuple3<Integer, Long, String>> filterDs = ds.
+                               filter(new Filter4());
+               List<Tuple3<Integer, Long, String>> result = filterDs.collect();
+
+               String expected = "2,2,Hello\n" +
+                               "4,3,Hello world, how are you?\n" +
+                               "6,3,Luke Skywalker\n" +
+                               "8,4,Comment#2\n" +
+                               "10,4,Comment#4\n" +
+                               "12,5,Comment#6\n" +
+                               "14,5,Comment#8\n" +
+                               "16,6,Comment#10\n" +
+                               "18,6,Comment#12\n" +
+                               "20,6,Comment#14\n";
+
+               compareResultAsTuples(result, expected);
+       }
+
+       private static class Filter4 implements FilterFunction<Tuple3<Integer, 
Long, String>> {
+               private static final long serialVersionUID = 1L;
+
+               @Override
+               public boolean filter(Tuple3<Integer, Long, String> value) 
throws Exception {
+                       return (value.f0 % 2) == 0;
+               }
+       }
+
+       @Test
+       public void testFilterBasicType() throws Exception {
+               /*
+                * Test filter on basic type
+                */
+
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               DataSet<String> ds = CollectionDataSets.getStringDataSet(env);
+               DataSet<String> filterDs = ds.
+                               filter(new Filter5());
+               List<String> result = filterDs.collect();
+
+               String expected = "Hi\n" +
+                               "Hello\n" +
+                               "Hello world\n" +
+                               "Hello world, how are you?\n";
+
+               compareResultAsText(result, expected);
+       }
+
+       private static class Filter5 implements FilterFunction<String> {
+               private static final long serialVersionUID = 1L;
+
+               @Override
+               public boolean filter(String value) throws Exception {
+                       return value.startsWith("H");
+               }
+       }
+
+       @Test
+       public void testFilterOnCustomType() throws Exception {
+               /*
+                * Test filter on custom type
+                */
+
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               DataSet<CustomType> ds = 
CollectionDataSets.getCustomTypeDataSet(env);
+               DataSet<CustomType> filterDs = ds.
+                               filter(new Filter6());
+               List<CustomType> result = filterDs.collect();
+
+               String expected = "3,3,Hello world, how are you?\n"
+                               +
+                               "3,4,I am fine.\n" +
+                               "3,5,Luke Skywalker\n";
+
+               compareResultAsText(result, expected);
+       }
+
+       private static class Filter6 implements FilterFunction<CustomType> {
+               private static final long serialVersionUID = 1L;
+
+               @Override
+               public boolean filter(CustomType value) throws Exception {
+                       return value.myString.contains("a");
+               }
+       }
+
+       @Test
+       public void testRichFilterOnStringTupleField() throws Exception {
+               /*
+                * Test filter on String tuple field.
+                */
+
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               DataSet<Integer> ints = 
CollectionDataSets.getIntegerDataSet(env);
+
+               DataSet<Tuple3<Integer, Long, String>> ds = 
CollectionDataSets.get3TupleDataSet(env);
+               DataSet<Tuple3<Integer, Long, String>> filterDs = ds.
+                               filter(new 
RichFilter1()).withBroadcastSet(ints, "ints");
+               List<Tuple3<Integer, Long, String>> result = filterDs.collect();
+
+               String expected = "1,1,Hi\n" +
+                               "2,2,Hello\n" +
+                               "3,2,Hello world\n" +
+                               "4,3,Hello world, how are you?\n";
+
+               compareResultAsTuples(result, expected);
+       }
+
+       private static class RichFilter1 extends 
RichFilterFunction<Tuple3<Integer, Long, String>> {
+               private static final long serialVersionUID = 1L;
+
+               int literal = -1;
+
+               @Override
+               public void open(Configuration config) {
+                       Collection<Integer> ints = 
this.getRuntimeContext().getBroadcastVariable("ints");
+                       for (int i: ints) {
+                               literal = literal < i ? i : literal;
+                       }
+               }
+
+               @Override
+               public boolean filter(Tuple3<Integer, Long, String> value) 
throws Exception {
+                       return value.f0 < literal;
+               }
+       }
+
+       @Test
+       public void testFilterWithBroadcastVariables() throws Exception {
+               /*
+                * Test filter with broadcast variables
+                */
+
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               DataSet<Integer> intDs = 
CollectionDataSets.getIntegerDataSet(env);
+
+               DataSet<Tuple3<Integer, Long, String>> ds = 
CollectionDataSets.get3TupleDataSet(env);
+               DataSet<Tuple3<Integer, Long, String>> filterDs = ds.
+                               filter(new 
RichFilter2()).withBroadcastSet(intDs, "ints");
+               List<Tuple3<Integer, Long, String>> result = filterDs.collect();
+
+               String expected = "11,5,Comment#5\n" +
+                               "12,5,Comment#6\n" +
+                               "13,5,Comment#7\n" +
+                               "14,5,Comment#8\n" +
+                               "15,5,Comment#9\n";
+
+               compareResultAsTuples(result, expected);
+       }
+
+       private static class RichFilter2 extends 
RichFilterFunction<Tuple3<Integer, Long, String>> {
+               private static final long serialVersionUID = 1L;
+               private  int broadcastSum = 0;
+
+               @Override
+               public void open(Configuration config) {
+                       Collection<Integer> ints = 
this.getRuntimeContext().getBroadcastVariable("ints");
+                       for (Integer i : ints) {
+                               broadcastSum += i;
+                       }
+               }
+
+               @Override
+               public boolean filter(Tuple3<Integer, Long, String> value) 
throws Exception {
+                       return (value.f1 == (broadcastSum / 11));
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/operators/FirstNITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/operators/FirstNITCase.java 
b/flink-tests/src/test/java/org/apache/flink/test/operators/FirstNITCase.java
new file mode 100644
index 0000000..abbd446
--- /dev/null
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/operators/FirstNITCase.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.operators;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.operators.Order;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.operators.GroupReduceOperator;
+import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.test.operators.util.CollectionDataSets;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.List;
+
+/**
+ * Integration tests for {@link DataSet#first}.
+ */
+@RunWith(Parameterized.class)
+public class FirstNITCase extends MultipleProgramsTestBase {
+       public FirstNITCase(TestExecutionMode mode){
+               super(mode);
+       }
+
+       @Test
+       public void testFirstNOnUngroupedDS() throws Exception {
+               /*
+                * First-n on ungrouped data set
+                */
+
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               DataSet<Tuple3<Integer, Long, String>> ds = 
CollectionDataSets.get3TupleDataSet(env);
+               DataSet<Tuple1<Integer>> seven = ds.first(7).map(new 
OneMapper()).sum(0);
+
+               List<Tuple1<Integer>> result = seven.collect();
+
+               String expected = "(7)\n";
+
+               compareResultAsText(result, expected);
+       }
+
+       @Test
+       public void testFirstNOnGroupedDS() throws Exception {
+               /*
+                * First-n on grouped data set
+                */
+
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               DataSet<Tuple3<Integer, Long, String>> ds = 
CollectionDataSets.get3TupleDataSet(env);
+               DataSet<Tuple2<Long, Integer>> first = ds.groupBy(1).first(4)
+                               .map(new OneMapper2()).groupBy(0).sum(1);
+
+               List<Tuple2<Long, Integer>> result = first.collect();
+
+               String expected = "(1,1)\n(2,2)\n(3,3)\n(4,4)\n(5,4)\n(6,4)\n";
+
+               compareResultAsText(result, expected);
+       }
+
+       @Test
+       public void testFirstNOnGroupedAndSortedDS() throws Exception {
+               /*
+                * First-n on grouped and sorted data set
+                */
+
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               DataSet<Tuple3<Integer, Long, String>> ds = 
CollectionDataSets.get3TupleDataSet(env);
+               DataSet<Tuple2<Long, Integer>> first = 
ds.groupBy(1).sortGroup(0, Order.DESCENDING).first(3)
+                               .project(1, 0);
+
+               List<Tuple2<Long, Integer>> result = first.collect();
+
+               String expected = "(1,1)\n"
+                               + "(2,3)\n(2,2)\n"
+                               + "(3,6)\n(3,5)\n(3,4)\n"
+                               + "(4,10)\n(4,9)\n(4,8)\n"
+                               + "(5,15)\n(5,14)\n(5,13)\n"
+                               + "(6,21)\n(6,20)\n(6,19)\n";
+
+               compareResultAsText(result, expected);
+       }
+
+       /**
+        * Test for FLINK-2135.
+        */
+       @Test
+       public void testFaultyCast() throws Exception {
+               ExecutionEnvironment ee = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               DataSet<String> b = ee.fromElements("a", "b");
+               GroupReduceOperator<String, String> a = b.groupBy(new 
KeySelector<String, Long>() {
+                       @Override
+                       public Long getKey(String value) throws Exception {
+                               return 1L;
+                       }
+               }).sortGroup(new KeySelector<String, Double>() {
+                       @Override
+                       public Double getKey(String value) throws Exception {
+                               return 1.0;
+                       }
+               }, Order.DESCENDING).first(1);
+
+               List<String> result = b.collect();
+
+               String expected = "a\nb";
+
+               compareResultAsText(result, expected);
+       }
+
+       private static class OneMapper implements MapFunction<Tuple3<Integer, 
Long, String>, Tuple1<Integer>> {
+               private static final long serialVersionUID = 1L;
+               private final Tuple1<Integer> one = new Tuple1<Integer>(1);
+               @Override
+               public Tuple1<Integer> map(Tuple3<Integer, Long, String> value) 
{
+                       return one;
+               }
+       }
+
+       private static class OneMapper2 implements MapFunction<Tuple3<Integer, 
Long, String>, Tuple2<Long, Integer>> {
+               private static final long serialVersionUID = 1L;
+               private final Tuple2<Long, Integer> one = new Tuple2<>(0L, 1);
+
+               @Override
+               public Tuple2<Long, Integer> map(Tuple3<Integer, Long, String> 
value) {
+                       one.f0 = value.f1;
+                       return one;
+               }
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/operators/FlatMapITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/operators/FlatMapITCase.java 
b/flink-tests/src/test/java/org/apache/flink/test/operators/FlatMapITCase.java
new file mode 100644
index 0000000..4b683a9
--- /dev/null
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/operators/FlatMapITCase.java
@@ -0,0 +1,364 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.operators;
+
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.test.operators.util.CollectionDataSets;
+import org.apache.flink.test.operators.util.CollectionDataSets.CustomType;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.apache.flink.util.Collector;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.Collection;
+import java.util.List;
+
+/**
+ * Integration tests for {@link FlatMapFunction} and {@link 
RichFlatMapFunction}.
+ */
+@RunWith(Parameterized.class)
+public class FlatMapITCase extends MultipleProgramsTestBase {
+       public FlatMapITCase(TestExecutionMode mode){
+               super(mode);
+       }
+
+       @Test
+       public void testNonPassingFlatMap() throws Exception {
+               /*
+                * Test non-passing flatmap
+                */
+
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               DataSet<String> ds = CollectionDataSets.getStringDataSet(env);
+               DataSet<String> nonPassingFlatMapDs = ds.
+                               flatMap(new FlatMapper1());
+
+               List<String> result = nonPassingFlatMapDs.collect();
+
+               String expected = "\n";
+
+               compareResultAsText(result, expected);
+       }
+
+       private static class FlatMapper1 implements FlatMapFunction<String, 
String> {
+               private static final long serialVersionUID = 1L;
+
+               @Override
+               public void flatMap(String value, Collector<String> out) throws 
Exception {
+                       if (value.contains("bananas")) {
+                               out.collect(value);
+                       }
+               }
+       }
+
+       @Test
+       public void testDataDuplicatingFlatMap() throws Exception {
+               /*
+                * Test data duplicating flatmap
+                */
+
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               DataSet<String> ds = CollectionDataSets.getStringDataSet(env);
+               DataSet<String> duplicatingFlatMapDs = ds.
+                               flatMap(new FlatMapper2());
+
+               List<String> result = duplicatingFlatMapDs.collect();
+
+               String expected = "Hi\n" + "HI\n" +
+                               "Hello\n" + "HELLO\n" +
+                               "Hello world\n" + "HELLO WORLD\n" +
+                               "Hello world, how are you?\n" + "HELLO WORLD, 
HOW ARE YOU?\n" +
+                               "I am fine.\n" + "I AM FINE.\n" +
+                               "Luke Skywalker\n" + "LUKE SKYWALKER\n" +
+                               "Random comment\n" + "RANDOM COMMENT\n" +
+                               "LOL\n" + "LOL\n";
+
+               compareResultAsText(result, expected);
+       }
+
+       private static class FlatMapper2 implements FlatMapFunction<String, 
String> {
+               private static final long serialVersionUID = 1L;
+
+               @Override
+               public void flatMap(String value, Collector<String> out) throws 
Exception {
+                       out.collect(value);
+                       out.collect(value.toUpperCase());
+               }
+       }
+
+       @Test
+       public void testFlatMapWithVaryingNumberOfEmittedTuples() throws 
Exception {
+               /*
+                * Test flatmap with varying number of emitted tuples
+                */
+
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               DataSet<Tuple3<Integer, Long, String>> ds = 
CollectionDataSets.get3TupleDataSet(env);
+               DataSet<Tuple3<Integer, Long, String>> varyingTuplesMapDs = ds.
+                               flatMap(new FlatMapper3());
+
+               List<Tuple3<Integer, Long, String>> result = 
varyingTuplesMapDs.collect();
+
+               String expected = "1,1,Hi\n" +
+                               "2,2,Hello\n" + "2,2,Hello\n" +
+                               "4,3,Hello world, how are you?\n" +
+                               "5,3,I am fine.\n" + "5,3,I am fine.\n" +
+                               "7,4,Comment#1\n" +
+                               "8,4,Comment#2\n" + "8,4,Comment#2\n" +
+                               "10,4,Comment#4\n" +
+                               "11,5,Comment#5\n" + "11,5,Comment#5\n" +
+                               "13,5,Comment#7\n" +
+                               "14,5,Comment#8\n" + "14,5,Comment#8\n" +
+                               "16,6,Comment#10\n" +
+                               "17,6,Comment#11\n" + "17,6,Comment#11\n" +
+                               "19,6,Comment#13\n" +
+                               "20,6,Comment#14\n" + "20,6,Comment#14\n";
+
+               compareResultAsTuples(result, expected);
+       }
+
+       private static class FlatMapper3 implements 
FlatMapFunction<Tuple3<Integer, Long, String>, Tuple3<Integer, Long, String>> {
+               private static final long serialVersionUID = 1L;
+
+               @Override
+               public void flatMap(Tuple3<Integer, Long, String> value,
+                               Collector<Tuple3<Integer, Long, String>> out) 
throws Exception {
+                       final int numTuples = value.f0 % 3;
+                       for (int i = 0; i < numTuples; i++) {
+                               out.collect(value);
+                       }
+               }
+       }
+
+       @Test
+       public void testTypeConversionFlatMapperCustomToTuple() throws 
Exception {
+               /*
+                * Test type conversion flatmapper (Custom -> Tuple)
+                */
+
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               DataSet<CustomType> ds = 
CollectionDataSets.getCustomTypeDataSet(env);
+               DataSet<Tuple3<Integer, Long, String>> typeConversionFlatMapDs 
= ds.
+                               flatMap(new FlatMapper4());
+
+               List<Tuple3<Integer, Long, String>> result = 
typeConversionFlatMapDs.collect();
+
+               String expected = "1,0,Hi\n" +
+                               "2,1,Hello\n" +
+                               "2,2,Hello world\n" +
+                               "3,3,Hello world, how are you?\n" +
+                               "3,4,I am fine.\n" +
+                               "3,5,Luke Skywalker\n" +
+                               "4,6,Comment#1\n" +
+                               "4,7,Comment#2\n" +
+                               "4,8,Comment#3\n" +
+                               "4,9,Comment#4\n" +
+                               "5,10,Comment#5\n" +
+                               "5,11,Comment#6\n" +
+                               "5,12,Comment#7\n" +
+                               "5,13,Comment#8\n" +
+                               "5,14,Comment#9\n" +
+                               "6,15,Comment#10\n" +
+                               "6,16,Comment#11\n" +
+                               "6,17,Comment#12\n" +
+                               "6,18,Comment#13\n" +
+                               "6,19,Comment#14\n" +
+                               "6,20,Comment#15\n";
+
+               compareResultAsTuples(result, expected);
+       }
+
+       private static class FlatMapper4 implements FlatMapFunction<CustomType, 
Tuple3<Integer, Long, String>> {
+               private static final long serialVersionUID = 1L;
+               private final Tuple3<Integer, Long, String> outTuple =
+                               new Tuple3<Integer, Long, String>();
+
+               @Override
+               public void flatMap(CustomType value, Collector<Tuple3<Integer, 
Long, String>> out) throws Exception {
+                       outTuple.setField(value.myInt, 0);
+                       outTuple.setField(value.myLong, 1);
+                       outTuple.setField(value.myString, 2);
+                       out.collect(outTuple);
+               }
+       }
+
+       @Test
+       public void testTypeConversionFlatMapperTupleToBasic() throws Exception 
{
+               /*
+                * Test type conversion flatmapper (Tuple -> Basic)
+                */
+
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               DataSet<Tuple3<Integer, Long, String>> ds = 
CollectionDataSets.get3TupleDataSet(env);
+               DataSet<String> typeConversionFlatMapDs = ds.
+                               flatMap(new FlatMapper5());
+
+               List<String> result = typeConversionFlatMapDs.collect();
+
+               String expected = "Hi\n" + "Hello\n" + "Hello world\n"
+                               +
+                               "Hello world, how are you?\n" +
+                               "I am fine.\n" + "Luke Skywalker\n" +
+                               "Comment#1\n" + "Comment#2\n" +
+                               "Comment#3\n" + "Comment#4\n" +
+                               "Comment#5\n" + "Comment#6\n" +
+                               "Comment#7\n" + "Comment#8\n" +
+                               "Comment#9\n" + "Comment#10\n" +
+                               "Comment#11\n" + "Comment#12\n" +
+                               "Comment#13\n" + "Comment#14\n" +
+                               "Comment#15\n";
+
+               compareResultAsText(result, expected);
+       }
+
+       private static class FlatMapper5 implements 
FlatMapFunction<Tuple3<Integer, Long, String>, String> {
+               private static final long serialVersionUID = 1L;
+
+               @Override
+               public void flatMap(Tuple3<Integer, Long, String> value, 
Collector<String> out) throws Exception {
+                       out.collect(value.f2);
+               }
+       }
+
+       @Test
+       public void 
testFlatMapperIfUDFReturnsInputObjectMultipleTimesWhileChangingIt() throws 
Exception {
+               /*
+                * Test flatmapper if UDF returns input object
+                * multiple times and changes it in between
+                */
+
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               DataSet<Tuple3<Integer, Long, String>> ds = 
CollectionDataSets.get3TupleDataSet(env);
+               DataSet<Tuple3<Integer, Long, String>> inputObjFlatMapDs = ds.
+                               flatMap(new FlatMapper6());
+
+               List<Tuple3<Integer, Long, String>> result = 
inputObjFlatMapDs.collect();
+
+               String expected = "0,1,Hi\n" +
+                               "0,2,Hello\n" + "1,2,Hello\n" +
+                               "0,2,Hello world\n" + "1,2,Hello world\n" + 
"2,2,Hello world\n" +
+                               "0,3,I am fine.\n" +
+                               "0,3,Luke Skywalker\n" + "1,3,Luke Skywalker\n" 
+
+                               "0,4,Comment#1\n" + "1,4,Comment#1\n" + 
"2,4,Comment#1\n" +
+                               "0,4,Comment#3\n" +
+                               "0,4,Comment#4\n" + "1,4,Comment#4\n" +
+                               "0,5,Comment#5\n" + "1,5,Comment#5\n" + 
"2,5,Comment#5\n" +
+                               "0,5,Comment#7\n" +
+                               "0,5,Comment#8\n" + "1,5,Comment#8\n" +
+                               "0,5,Comment#9\n" + "1,5,Comment#9\n" + 
"2,5,Comment#9\n" +
+                               "0,6,Comment#11\n" +
+                               "0,6,Comment#12\n" + "1,6,Comment#12\n" +
+                               "0,6,Comment#13\n" + "1,6,Comment#13\n" + 
"2,6,Comment#13\n" +
+                               "0,6,Comment#15\n";
+
+               compareResultAsTuples(result, expected);
+       }
+
+       private static class FlatMapper6 implements 
FlatMapFunction<Tuple3<Integer, Long, String>, Tuple3<Integer, Long, String>> {
+               private static final long serialVersionUID = 1L;
+
+               @Override
+               public void flatMap(Tuple3<Integer, Long, String> value,
+                               Collector<Tuple3<Integer, Long, String>> out) 
throws Exception {
+                       final int numTuples = value.f0 % 4;
+                       for (int i = 0; i < numTuples; i++) {
+                               value.setField(i, 0);
+                               out.collect(value);
+                       }
+               }
+       }
+
+       @Test
+       public void testFlatMapWithBroadcastSet() throws Exception {
+               /*
+                * Test flatmap with broadcast set
+                */
+
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               DataSet<Integer> ints = 
CollectionDataSets.getIntegerDataSet(env);
+
+               DataSet<Tuple3<Integer, Long, String>> ds = 
CollectionDataSets.get3TupleDataSet(env);
+               DataSet<Tuple3<Integer, Long, String>> bcFlatMapDs = ds.
+                               flatMap(new 
RichFlatMapper1()).withBroadcastSet(ints, "ints");
+               List<Tuple3<Integer, Long, String>> result = 
bcFlatMapDs.collect();
+
+               String expected = "55,1,Hi\n" +
+                               "55,2,Hello\n" +
+                               "55,2,Hello world\n" +
+                               "55,3,Hello world, how are you?\n" +
+                               "55,3,I am fine.\n" +
+                               "55,3,Luke Skywalker\n" +
+                               "55,4,Comment#1\n" +
+                               "55,4,Comment#2\n" +
+                               "55,4,Comment#3\n" +
+                               "55,4,Comment#4\n" +
+                               "55,5,Comment#5\n" +
+                               "55,5,Comment#6\n" +
+                               "55,5,Comment#7\n" +
+                               "55,5,Comment#8\n" +
+                               "55,5,Comment#9\n" +
+                               "55,6,Comment#10\n" +
+                               "55,6,Comment#11\n" +
+                               "55,6,Comment#12\n" +
+                               "55,6,Comment#13\n" +
+                               "55,6,Comment#14\n" +
+                               "55,6,Comment#15\n";
+
+               compareResultAsTuples(result, expected);
+       }
+
+       private static class RichFlatMapper1 extends 
RichFlatMapFunction<Tuple3<Integer, Long, String>, Tuple3<Integer, Long, 
String>> {
+               private static final long serialVersionUID = 1L;
+               private final Tuple3<Integer, Long, String> outTuple =
+                               new Tuple3<Integer, Long, String>();
+               private Integer f2Replace = 0;
+
+               @Override
+               public void open(Configuration config) {
+                       Collection<Integer> ints = 
this.getRuntimeContext().getBroadcastVariable("ints");
+                       int sum = 0;
+                       for (Integer i : ints) {
+                               sum += i;
+                       }
+                       f2Replace = sum;
+               }
+
+               @Override
+               public void flatMap(Tuple3<Integer, Long, String> value,
+                               Collector<Tuple3<Integer, Long, String>> out) 
throws Exception {
+                       outTuple.setFields(f2Replace, value.f1, value.f2);
+                       out.collect(outTuple);
+               }
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/operators/GroupCombineITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/operators/GroupCombineITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/operators/GroupCombineITCase.java
new file mode 100644
index 0000000..f0ee031
--- /dev/null
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/operators/GroupCombineITCase.java
@@ -0,0 +1,484 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.operators;
+
+import org.apache.flink.api.common.functions.GroupCombineFunction;
+import org.apache.flink.api.common.functions.GroupReduceFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.operators.Order;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
+import org.apache.flink.api.java.operators.UnsortedGrouping;
+import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.test.operators.util.CollectionDataSets;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.apache.flink.util.Collector;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * The GroupCombine operator is not easy to test because it is essentially 
just a combiner. The result can be
+ * the result of a normal groupReduce at any stage its execution. The basic 
idea is to preserve the grouping key
+ * in the partial result, so that we can do a reduceGroup afterwards to 
finalize the results for verification.
+ * In addition, we can use hashPartition to partition the data and check if no 
shuffling (just combining) has
+ * been performed.
+ */
+@SuppressWarnings("serial")
+@RunWith(Parameterized.class)
+public class GroupCombineITCase extends MultipleProgramsTestBase {
+
+       public GroupCombineITCase(TestExecutionMode mode) {
+               super(mode);
+       }
+
+       private static String identityResult = "1,1,Hi\n" +
+                       "2,2,Hello\n" +
+                       "3,2,Hello world\n" +
+                       "4,3,Hello world, how are you?\n" +
+                       "5,3,I am fine.\n" +
+                       "6,3,Luke Skywalker\n" +
+                       "7,4,Comment#1\n" +
+                       "8,4,Comment#2\n" +
+                       "9,4,Comment#3\n" +
+                       "10,4,Comment#4\n" +
+                       "11,5,Comment#5\n" +
+                       "12,5,Comment#6\n" +
+                       "13,5,Comment#7\n" +
+                       "14,5,Comment#8\n" +
+                       "15,5,Comment#9\n" +
+                       "16,6,Comment#10\n" +
+                       "17,6,Comment#11\n" +
+                       "18,6,Comment#12\n" +
+                       "19,6,Comment#13\n" +
+                       "20,6,Comment#14\n" +
+                       "21,6,Comment#15\n";
+
+       @Test
+       public void testAllGroupCombineIdentity() throws Exception {
+
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               DataSet<Tuple3<Integer, Long, String>> ds = 
CollectionDataSets.get3TupleDataSet(env);
+
+               DataSet<Tuple3<Integer, Long, String>> reduceDs = ds
+                               // combine
+                               .combineGroup(new IdentityFunction())
+                               // fully reduce
+                               .reduceGroup(new IdentityFunction());
+
+               List<Tuple3<Integer, Long, String>> result = reduceDs.collect();
+
+               compareResultAsTuples(result, identityResult);
+       }
+
+       @Test
+       public void testIdentity() throws Exception {
+
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               DataSet<Tuple3<Integer, Long, String>> ds = 
CollectionDataSets.get3TupleDataSet(env);
+
+               DataSet<Tuple3<Integer, Long, String>> reduceDs = ds
+                               // combine
+                               .combineGroup(new IdentityFunction())
+                               // fully reduce
+                               .reduceGroup(new IdentityFunction());
+
+               List<Tuple3<Integer, Long, String>> result = reduceDs.collect();
+
+               compareResultAsTuples(result, identityResult);
+       }
+
+       @Test
+       public void testIdentityWithGroupBy() throws Exception {
+
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               DataSet<Tuple3<Integer, Long, String>> ds = 
CollectionDataSets.get3TupleDataSet(env);
+
+               DataSet<Tuple3<Integer, Long, String>> reduceDs = ds
+                               .groupBy(1)
+                               // combine
+                               .combineGroup(new IdentityFunction())
+                               // fully reduce
+                               .reduceGroup(new IdentityFunction());
+
+               List<Tuple3<Integer, Long, String>> result = reduceDs.collect();
+
+               compareResultAsTuples(result, identityResult);
+       }
+
+       @Test
+       public void testIdentityWithGroupByAndSort() throws Exception {
+
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               DataSet<Tuple3<Integer, Long, String>> ds = 
CollectionDataSets.get3TupleDataSet(env);
+
+               DataSet<Tuple3<Integer, Long, String>> reduceDs = ds
+                               .groupBy(1)
+                               .sortGroup(1, Order.DESCENDING)
+                               // reduce partially
+                               .combineGroup(new IdentityFunction())
+                               .groupBy(1)
+                               .sortGroup(1, Order.DESCENDING)
+                               // fully reduce
+                               .reduceGroup(new IdentityFunction());
+
+               List<Tuple3<Integer, Long, String>> result = reduceDs.collect();
+
+               compareResultAsTuples(result, identityResult);
+       }
+
+       @Test
+       public void testPartialReduceWithIdenticalInputOutputType() throws 
Exception {
+
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               // data
+               DataSet<Tuple3<Integer, Long, String>> ds = 
CollectionDataSets.get3TupleDataSet(env);
+
+               DataSet<Tuple2<Long, Tuple3<Integer, Long, String>>> dsWrapped 
= ds
+                               // wrap values as Kv pairs with the grouping 
key as key
+                               .map(new Tuple3KvWrapper());
+
+               List<Tuple3<Integer, Long, String>> result = dsWrapped
+                               .groupBy(0)
+                               // reduce partially
+                               .combineGroup(new Tuple3toTuple3GroupReduce())
+                               .groupBy(0)
+                               // reduce fully to check result
+                               .reduceGroup(new Tuple3toTuple3GroupReduce())
+                               //unwrap
+                               .map(new MapFunction<Tuple2<Long, 
Tuple3<Integer, Long, String>>, Tuple3<Integer, Long, String>>() {
+                                       @Override
+                                       public Tuple3<Integer, Long, String> 
map(Tuple2<Long, Tuple3<Integer, Long, String>> value) throws Exception {
+                                               return value.f1;
+                                       }
+                               }).collect();
+
+               String expected = "1,1,combined\n" +
+                               "5,4,combined\n" +
+                               "15,9,combined\n" +
+                               "34,16,combined\n" +
+                               "65,25,combined\n" +
+                               "111,36,combined\n";
+
+               compareResultAsTuples(result, expected);
+       }
+
+       @Test
+       public void testPartialReduceWithDifferentInputOutputType() throws 
Exception {
+
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               // data
+               DataSet<Tuple3<Integer, Long, String>> ds = 
CollectionDataSets.get3TupleDataSet(env);
+
+               DataSet<Tuple2<Long, Tuple3<Integer, Long, String>>> dsWrapped 
= ds
+                               // wrap values as Kv pairs with the grouping 
key as key
+                               .map(new Tuple3KvWrapper());
+
+               List<Tuple2<Integer, Long>> result = dsWrapped
+                               .groupBy(0)
+                               // reduce partially
+                               .combineGroup(new Tuple3toTuple2GroupReduce())
+                               .groupBy(0)
+                               // reduce fully to check result
+                               .reduceGroup(new Tuple2toTuple2GroupReduce())
+                               //unwrap
+                               .map(new MapFunction<Tuple2<Long, 
Tuple2<Integer, Long>>, Tuple2<Integer, Long>>() {
+                                       @Override
+                                       public Tuple2<Integer, Long> 
map(Tuple2<Long, Tuple2<Integer, Long>> value) throws Exception {
+                                               return value.f1;
+                                       }
+                               }).collect();
+
+               String expected = "1,3\n" +
+                               "5,20\n" +
+                               "15,58\n" +
+                               "34,52\n" +
+                               "65,70\n" +
+                               "111,96\n";
+
+               compareResultAsTuples(result, expected);
+       }
+
+       @Test
+       // check if no shuffle is being executed
+       public void testCheckPartitionShuffleGroupBy() throws Exception {
+
+               org.junit.Assume.assumeTrue(mode != 
TestExecutionMode.COLLECTION);
+
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               // data
+               DataSet<Tuple3<Integer, Long, String>> ds = 
CollectionDataSets.get3TupleDataSet(env);
+
+               // partition and group data
+               UnsortedGrouping<Tuple3<Integer, Long, String>> partitionedDS = 
ds.partitionByHash(0).groupBy(1);
+
+               List<Tuple2<Long, Integer>> result = partitionedDS
+                               .combineGroup(
+                                               new 
GroupCombineFunction<Tuple3<Integer, Long, String>, Tuple2<Long, Integer>>() {
+                       @Override
+                       public void combine(Iterable<Tuple3<Integer, Long, 
String>> values, Collector<Tuple2<Long, Integer>> out) throws Exception {
+                               int count = 0;
+                               long key = 0;
+                               for (Tuple3<Integer, Long, String> value : 
values) {
+                                       key = value.f1;
+                                       count++;
+                               }
+                               out.collect(new Tuple2<>(key, count));
+                       }
+               }).collect();
+
+               String[] localExpected = new String[] { "(6,6)", "(5,5)" + 
"(4,4)", "(3,3)", "(2,2)", "(1,1)" };
+
+               String[] resultAsStringArray = new String[result.size()];
+               for (int i = 0; i < resultAsStringArray.length; ++i) {
+                       resultAsStringArray[i] = result.get(i).toString();
+               }
+               Arrays.sort(resultAsStringArray);
+
+               Assert.assertEquals("The two arrays were identical.", false, 
Arrays.equals(localExpected, resultAsStringArray));
+       }
+
+       @Test
+       // check if parallelism of 1 results in the same data like a shuffle
+       public void testCheckPartitionShuffleDOP1() throws Exception {
+
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               env.setParallelism(1);
+
+               // data
+               DataSet<Tuple3<Integer, Long, String>> ds = 
CollectionDataSets.get3TupleDataSet(env);
+
+               // partition and group data
+               UnsortedGrouping<Tuple3<Integer, Long, String>> partitionedDS = 
ds.partitionByHash(0).groupBy(1);
+
+               List<Tuple2<Long, Integer>> result = partitionedDS
+                               .combineGroup(
+                               new GroupCombineFunction<Tuple3<Integer, Long, 
String>, Tuple2<Long, Integer>>() {
+                                       @Override
+                                       public void 
combine(Iterable<Tuple3<Integer, Long, String>> values, Collector<Tuple2<Long, 
Integer>> out) throws Exception {
+                                               int count = 0;
+                                               long key = 0;
+                                               for (Tuple3<Integer, Long, 
String> value : values) {
+                                                       key = value.f1;
+                                                       count++;
+                                               }
+                                               out.collect(new Tuple2<>(key, 
count));
+                                       }
+                               }).collect();
+
+               String expected = "6,6\n" +
+                               "5,5\n" +
+                               "4,4\n" +
+                               "3,3\n" +
+                               "2,2\n" +
+                               "1,1\n";
+
+               compareResultAsTuples(result, expected);
+       }
+
+       @Test
+       // check if all API methods are callable
+       public void testAPI() throws Exception {
+
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               DataSet<Tuple1<String>> ds = 
CollectionDataSets.getStringDataSet(env).map(new MapFunction<String, 
Tuple1<String>>() {
+                       @Override
+                       public Tuple1<String> map(String value) throws 
Exception {
+                               return new Tuple1<>(value);
+                       }
+               });
+
+               // all methods on DataSet
+               ds.combineGroup(new GroupCombineFunctionExample())
+               .output(new DiscardingOutputFormat<Tuple1<String>>());
+
+               // all methods on UnsortedGrouping
+               ds.groupBy(0).combineGroup(new GroupCombineFunctionExample())
+               .output(new DiscardingOutputFormat<Tuple1<String>>());
+
+               // all methods on SortedGrouping
+               ds.groupBy(0).sortGroup(0, Order.ASCENDING).combineGroup(new 
GroupCombineFunctionExample())
+               .output(new DiscardingOutputFormat<Tuple1<String>>());
+
+               env.execute();
+       }
+
+       private static class GroupCombineFunctionExample implements 
GroupCombineFunction<Tuple1<String>, Tuple1<String>> {
+
+               @Override
+               public void combine(Iterable<Tuple1<String>> values, 
Collector<Tuple1<String>> out) throws Exception {
+                       for (Tuple1<String> value : values) {
+                               out.collect(value);
+                       }
+               }
+       }
+
+       /**
+        * For Scala GroupCombineITCase.
+        */
+       public static class ScalaGroupCombineFunctionExample implements 
GroupCombineFunction<scala.Tuple1<String>, scala.Tuple1<String>> {
+
+               @Override
+               public void combine(Iterable<scala.Tuple1<String>> values, 
Collector<scala.Tuple1<String>> out) throws Exception {
+                       for (scala.Tuple1<String> value : values) {
+                               out.collect(value);
+                       }
+               }
+       }
+
+       private static class IdentityFunction implements 
GroupCombineFunction<Tuple3<Integer, Long, String>, Tuple3<Integer, Long, 
String>>,
+       GroupReduceFunction<Tuple3<Integer, Long, String>, Tuple3<Integer, 
Long, String>> {
+
+               @Override
+               public void combine(Iterable<Tuple3<Integer, Long, String>> 
values, Collector<Tuple3<Integer, Long, String>> out) throws Exception {
+                       for (Tuple3<Integer, Long, String> value : values) {
+                               out.collect(new Tuple3<>(value.f0, value.f1, 
value.f2));
+                       }
+               }
+
+               @Override
+               public void reduce(Iterable<Tuple3<Integer, Long, String>> 
values, Collector<Tuple3<Integer, Long, String>> out) throws Exception {
+                       for (Tuple3<Integer, Long, String> value : values) {
+                               out.collect(new Tuple3<>(value.f0, value.f1, 
value.f2));
+                       }
+               }
+       }
+
+       private static class Tuple3toTuple3GroupReduce implements 
KvGroupReduce<Long, Tuple3<Integer, Long, String>,
+                       Tuple3<Integer, Long, String>, Tuple3<Integer, Long, 
String>> {
+
+               @Override
+               public void combine(Iterable<Tuple2<Long, Tuple3<Integer, Long, 
String>>> values, Collector<Tuple2<Long,
+                               Tuple3<Integer, Long, String>>> out) throws 
Exception {
+                       int i = 0;
+                       long l = 0;
+                       long key = 0;
+
+                       // collapse groups
+                       for (Tuple2<Long, Tuple3<Integer, Long, String>> value 
: values) {
+                               key = value.f0;
+                               Tuple3<Integer, Long, String> extracted = 
value.f1;
+                               i += extracted.f0;
+                               l += extracted.f1;
+                       }
+
+                       Tuple3<Integer, Long, String> result = new Tuple3<>(i, 
l, "combined");
+                       out.collect(new Tuple2<>(key, result));
+               }
+
+               @Override
+               public void reduce(Iterable<Tuple2<Long, Tuple3<Integer, Long, 
String>>> values,
+                               Collector<Tuple2<Long, Tuple3<Integer, Long, 
String>>> out) throws Exception {
+                       combine(values, out);
+               }
+       }
+
+       private static class Tuple3toTuple2GroupReduce implements 
KvGroupReduce<Long, Tuple3<Integer, Long, String>,
+                       Tuple2<Integer, Long>, Tuple2<Integer, Long>> {
+
+               @Override
+               public void combine(Iterable<Tuple2<Long, Tuple3<Integer, Long, 
String>>> values, Collector<Tuple2<Long,
+                               Tuple2<Integer, Long>>> out) throws Exception {
+                       int i = 0;
+                       long l = 0;
+                       long key = 0;
+
+                       // collapse groups
+                       for (Tuple2<Long, Tuple3<Integer, Long, String>> value 
: values) {
+                               key = value.f0;
+                               Tuple3<Integer, Long, String> extracted = 
value.f1;
+                               i += extracted.f0;
+                               l += extracted.f1 + extracted.f2.length();
+                       }
+
+                       Tuple2<Integer, Long> result = new Tuple2<>(i, l);
+                       out.collect(new Tuple2<>(key, result));
+               }
+
+               @Override
+               public void reduce(Iterable<Tuple2<Long, Tuple2<Integer, 
Long>>> values, Collector<Tuple2<Long,
+                               Tuple2<Integer, Long>>> out) throws Exception {
+                       new Tuple2toTuple2GroupReduce().reduce(values, out);
+               }
+       }
+
+       private static class Tuple2toTuple2GroupReduce implements 
KvGroupReduce<Long, Tuple2<Integer, Long>,
+                       Tuple2<Integer, Long>, Tuple2<Integer, Long>> {
+
+               @Override
+               public void combine(Iterable<Tuple2<Long, Tuple2<Integer, 
Long>>> values, Collector<Tuple2<Long, Tuple2<Integer,
+                               Long>>> out) throws Exception {
+                       int i = 0;
+                       long l = 0;
+                       long key = 0;
+
+                       // collapse groups
+                       for (Tuple2<Long, Tuple2<Integer, Long>> value : 
values) {
+                               key = value.f0;
+                               Tuple2<Integer, Long> extracted = value.f1;
+                               i += extracted.f0;
+                               l += extracted.f1;
+                       }
+
+                       Tuple2<Integer, Long> result = new Tuple2<>(i, l);
+
+                       out.collect(new Tuple2<>(key, result));
+               }
+
+               @Override
+               public void reduce(Iterable<Tuple2<Long, Tuple2<Integer, 
Long>>> values, Collector<Tuple2<Long,
+                               Tuple2<Integer, Long>>> out) throws Exception {
+                       combine(values, out);
+               }
+       }
+
+       private class Tuple3KvWrapper implements MapFunction<Tuple3<Integer, 
Long, String>, Tuple2<Long,
+                       Tuple3<Integer, Long, String>>> {
+               @Override
+               public Tuple2<Long, Tuple3<Integer, Long, String>> 
map(Tuple3<Integer, Long, String> value) throws Exception {
+                       return new Tuple2<>(value.f1, value);
+               }
+       }
+
+       private interface CombineAndReduceGroup <IN, INT, OUT> extends 
GroupCombineFunction<IN, INT>,
+                       GroupReduceFunction<INT, OUT> {
+       }
+
+       private interface KvGroupReduce<K, V, INT, OUT> extends 
CombineAndReduceGroup<Tuple2<K, V>, Tuple2<K, INT>,
+                       Tuple2<K, OUT>> {
+       }
+
+}

Reply via email to