http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/operators/MapITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/operators/MapITCase.java 
b/flink-tests/src/test/java/org/apache/flink/test/operators/MapITCase.java
new file mode 100644
index 0000000..dfb3efb
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/operators/MapITCase.java
@@ -0,0 +1,518 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.operators;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.test.operators.util.CollectionDataSets;
+import org.apache.flink.test.operators.util.CollectionDataSets.CustomType;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.Collection;
+import java.util.List;
+
+/**
+ * Integration tests for {@link MapFunction} and {@link RichMapFunction}.
+ */
+@RunWith(Parameterized.class)
+public class MapITCase extends MultipleProgramsTestBase {
+
+       public MapITCase(TestExecutionMode mode){
+               super(mode);
+       }
+
+       @Test
+       public void testIdentityMapWithBasicType() throws Exception {
+               /*
+                * Test identity map with basic type
+                */
+
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               DataSet<String> ds = CollectionDataSets.getStringDataSet(env);
+               DataSet<String> identityMapDs = ds.
+                               map(new Mapper1());
+
+               List<String> result = identityMapDs.collect();
+
+               String expected = "Hi\n" +
+                               "Hello\n" +
+                               "Hello world\n" +
+                               "Hello world, how are you?\n" +
+                               "I am fine.\n" +
+                               "Luke Skywalker\n" +
+                               "Random comment\n" +
+                               "LOL\n";
+
+               compareResultAsText(result, expected);
+       }
+
+       @Test
+       public void testRuntimeContextAndExecutionConfigParams() throws 
Exception {
+               /*
+                * Test identity map with basic type
+                */
+
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               env.getConfig().setNumberOfExecutionRetries(1000);
+               env.getConfig().setTaskCancellationInterval(50000);
+
+               DataSet<String> ds = CollectionDataSets.getStringDataSet(env);
+               DataSet<String> identityMapDs = ds.
+                       map(new RichMapFunction<String, String>() {
+                               @Override
+                               public String map(String value) throws 
Exception {
+                                       Assert.assertTrue(1000 == 
getRuntimeContext().getExecutionConfig().getNumberOfExecutionRetries());
+                                       Assert.assertTrue(50000 == 
getRuntimeContext().getExecutionConfig().getTaskCancellationInterval());
+                                       return value;
+                               }
+                       });
+
+               List<String> result = identityMapDs.collect();
+
+               String expected = "Hi\n" +
+                       "Hello\n" +
+                       "Hello world\n" +
+                       "Hello world, how are you?\n" +
+                       "I am fine.\n" +
+                       "Luke Skywalker\n" +
+                       "Random comment\n" +
+                       "LOL\n";
+
+               compareResultAsText(result, expected);
+       }
+
+       private static class Mapper1 implements MapFunction<String, String> {
+               private static final long serialVersionUID = 1L;
+
+               @Override
+               public String map(String value) throws Exception {
+                       return value;
+               }
+       }
+
+       @Test
+       public void testIdentityMapWithTuple() throws Exception {
+               /*
+                * Test identity map with a tuple
+                */
+
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               DataSet<Tuple3<Integer, Long, String>> ds = 
CollectionDataSets.get3TupleDataSet(env);
+               DataSet<Tuple3<Integer, Long, String>> identityMapDs = ds.
+                               map(new Mapper2());
+
+               List<Tuple3<Integer, Long, String>> result = 
identityMapDs.collect();
+
+               String expected = "1,1,Hi\n" +
+                               "2,2,Hello\n" +
+                               "3,2,Hello world\n" +
+                               "4,3,Hello world, how are you?\n" +
+                               "5,3,I am fine.\n" +
+                               "6,3,Luke Skywalker\n" +
+                               "7,4,Comment#1\n" +
+                               "8,4,Comment#2\n" +
+                               "9,4,Comment#3\n" +
+                               "10,4,Comment#4\n" +
+                               "11,5,Comment#5\n" +
+                               "12,5,Comment#6\n" +
+                               "13,5,Comment#7\n" +
+                               "14,5,Comment#8\n" +
+                               "15,5,Comment#9\n" +
+                               "16,6,Comment#10\n" +
+                               "17,6,Comment#11\n" +
+                               "18,6,Comment#12\n" +
+                               "19,6,Comment#13\n" +
+                               "20,6,Comment#14\n" +
+                               "21,6,Comment#15\n";
+
+               compareResultAsTuples(result, expected);
+       }
+
+       private static class Mapper2 implements MapFunction<Tuple3<Integer, 
Long, String>, Tuple3<Integer, Long, String>> {
+               private static final long serialVersionUID = 1L;
+
+               @Override
+               public Tuple3<Integer, Long, String> map(Tuple3<Integer, Long, 
String> value)
+                               throws Exception {
+                       return value;
+               }
+       }
+
+       @Test
+       public void testTypeConversionMapperCustomToTuple() throws Exception {
+               /*
+                * Test type conversion mapper (Custom -> Tuple)
+                */
+
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               DataSet<CustomType> ds = 
CollectionDataSets.getCustomTypeDataSet(env);
+               DataSet<Tuple3<Integer, Long, String>> typeConversionMapDs = ds.
+                               map(new Mapper3());
+
+               List<Tuple3<Integer, Long, String>> result = 
typeConversionMapDs.collect();
+
+               String expected = "1,0,Hi\n" +
+                               "2,1,Hello\n" +
+                               "2,2,Hello world\n" +
+                               "3,3,Hello world, how are you?\n" +
+                               "3,4,I am fine.\n" +
+                               "3,5,Luke Skywalker\n" +
+                               "4,6,Comment#1\n" +
+                               "4,7,Comment#2\n" +
+                               "4,8,Comment#3\n" +
+                               "4,9,Comment#4\n" +
+                               "5,10,Comment#5\n" +
+                               "5,11,Comment#6\n" +
+                               "5,12,Comment#7\n" +
+                               "5,13,Comment#8\n" +
+                               "5,14,Comment#9\n" +
+                               "6,15,Comment#10\n" +
+                               "6,16,Comment#11\n" +
+                               "6,17,Comment#12\n" +
+                               "6,18,Comment#13\n" +
+                               "6,19,Comment#14\n" +
+                               "6,20,Comment#15\n";
+
+               compareResultAsTuples(result, expected);
+       }
+
+       private static class Mapper3 implements MapFunction<CustomType, 
Tuple3<Integer, Long, String>> {
+               private static final long serialVersionUID = 1L;
+               private final Tuple3<Integer, Long, String> out = new 
Tuple3<Integer, Long, String>();
+
+               @Override
+               public Tuple3<Integer, Long, String> map(CustomType value) 
throws Exception {
+                       out.setField(value.myInt, 0);
+                       out.setField(value.myLong, 1);
+                       out.setField(value.myString, 2);
+                       return out;
+               }
+       }
+
+       @Test
+       public void testTypeConversionMapperTupleToBasic() throws Exception {
+               /*
+                * Test type conversion mapper (Tuple -> Basic)
+                */
+
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               DataSet<Tuple3<Integer, Long, String>> ds = 
CollectionDataSets.get3TupleDataSet(env);
+               DataSet<String> typeConversionMapDs = ds.
+                               map(new Mapper4());
+
+               List<String> result = typeConversionMapDs.collect();
+
+               String expected = "Hi\n" + "Hello\n" + "Hello world\n" +
+                               "Hello world, how are you?\n" +
+                               "I am fine.\n" + "Luke Skywalker\n" +
+                               "Comment#1\n" + "Comment#2\n" +
+                               "Comment#3\n" + "Comment#4\n" +
+                               "Comment#5\n" + "Comment#6\n" +
+                               "Comment#7\n" + "Comment#8\n" +
+                               "Comment#9\n" + "Comment#10\n" +
+                               "Comment#11\n" + "Comment#12\n" +
+                               "Comment#13\n" + "Comment#14\n" +
+                               "Comment#15\n";
+
+               compareResultAsText(result, expected);
+       }
+
+       private static class Mapper4 implements MapFunction<Tuple3<Integer, 
Long, String>, String> {
+               private static final long serialVersionUID = 1L;
+
+               @Override
+               public String map(Tuple3<Integer, Long, String> value) throws 
Exception {
+                       return value.getField(2);
+               }
+       }
+
+       @Test
+       public void 
testMapperOnTupleIncrementIntegerFieldReorderSecondAndThirdFields() throws
+       Exception {
+               /*
+                * Test mapper on tuple - Increment Integer field, reorder 
second and third fields
+                */
+
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               DataSet<Tuple3<Integer, Long, String>> ds = 
CollectionDataSets.get3TupleDataSet(env);
+               DataSet<Tuple3<Integer, String, Long>> tupleMapDs = ds.
+                               map(new Mapper5());
+
+               List<Tuple3<Integer, String, Long>> result = 
tupleMapDs.collect();
+
+               String expected = "2,Hi,1\n" +
+                               "3,Hello,2\n" +
+                               "4,Hello world,2\n" +
+                               "5,Hello world, how are you?,3\n" +
+                               "6,I am fine.,3\n" +
+                               "7,Luke Skywalker,3\n" +
+                               "8,Comment#1,4\n" +
+                               "9,Comment#2,4\n" +
+                               "10,Comment#3,4\n" +
+                               "11,Comment#4,4\n" +
+                               "12,Comment#5,5\n" +
+                               "13,Comment#6,5\n" +
+                               "14,Comment#7,5\n" +
+                               "15,Comment#8,5\n" +
+                               "16,Comment#9,5\n" +
+                               "17,Comment#10,6\n" +
+                               "18,Comment#11,6\n" +
+                               "19,Comment#12,6\n" +
+                               "20,Comment#13,6\n" +
+                               "21,Comment#14,6\n" +
+                               "22,Comment#15,6\n";
+
+               compareResultAsTuples(result, expected);
+       }
+
+       private static class Mapper5 implements MapFunction<Tuple3<Integer, 
Long, String>, Tuple3<Integer, String, Long>> {
+               private static final long serialVersionUID = 1L;
+               private final Tuple3<Integer, String, Long> out = new 
Tuple3<Integer, String, Long>();
+
+               @Override
+               public Tuple3<Integer, String, Long> map(Tuple3<Integer, Long, 
String> value)
+                               throws Exception {
+                       Integer incr = Integer.valueOf(value.f0.intValue() + 1);
+                       out.setFields(incr, value.f2, value.f1);
+                       return out;
+               }
+       }
+
+       @Test
+       public void testMapperOnCustomLowercaseString() throws Exception {
+               /*
+                * Test mapper on Custom - lowercase myString
+                */
+
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               DataSet<CustomType> ds = 
CollectionDataSets.getCustomTypeDataSet(env);
+               DataSet<CustomType> customMapDs = ds.
+                               map(new Mapper6());
+
+               List<CustomType> result = customMapDs.collect();
+
+               String expected = "1,0,hi\n" +
+                               "2,1,hello\n" +
+                               "2,2,hello world\n" +
+                               "3,3,hello world, how are you?\n" +
+                               "3,4,i am fine.\n" +
+                               "3,5,luke skywalker\n" +
+                               "4,6,comment#1\n" +
+                               "4,7,comment#2\n" +
+                               "4,8,comment#3\n" +
+                               "4,9,comment#4\n" +
+                               "5,10,comment#5\n" +
+                               "5,11,comment#6\n" +
+                               "5,12,comment#7\n" +
+                               "5,13,comment#8\n" +
+                               "5,14,comment#9\n" +
+                               "6,15,comment#10\n" +
+                               "6,16,comment#11\n" +
+                               "6,17,comment#12\n" +
+                               "6,18,comment#13\n" +
+                               "6,19,comment#14\n" +
+                               "6,20,comment#15\n";
+
+               compareResultAsText(result, expected);
+       }
+
+       private static class Mapper6 implements MapFunction<CustomType, 
CustomType> {
+               private static final long serialVersionUID = 1L;
+               private final CustomType out = new CustomType();
+
+               @Override
+               public CustomType map(CustomType value) throws Exception {
+                       out.myInt = value.myInt;
+                       out.myLong = value.myLong;
+                       out.myString = value.myString.toLowerCase();
+                       return out;
+               }
+       }
+
+       @Test
+       public void test() throws Exception {
+               /*
+                * Test mapper if UDF returns input object - increment first 
field of a tuple
+                */
+
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               DataSet<Tuple3<Integer, Long, String>> ds = 
CollectionDataSets.get3TupleDataSet(env);
+               DataSet<Tuple3<Integer, Long, String>> inputObjMapDs = ds.
+                               map(new Mapper7());
+
+               List<Tuple3<Integer, Long, String>> result = 
inputObjMapDs.collect();
+
+               String expected = "2,1,Hi\n" +
+                               "3,2,Hello\n" +
+                               "4,2,Hello world\n" +
+                               "5,3,Hello world, how are you?\n" +
+                               "6,3,I am fine.\n" +
+                               "7,3,Luke Skywalker\n" +
+                               "8,4,Comment#1\n" +
+                               "9,4,Comment#2\n" +
+                               "10,4,Comment#3\n" +
+                               "11,4,Comment#4\n" +
+                               "12,5,Comment#5\n" +
+                               "13,5,Comment#6\n" +
+                               "14,5,Comment#7\n" +
+                               "15,5,Comment#8\n" +
+                               "16,5,Comment#9\n" +
+                               "17,6,Comment#10\n" +
+                               "18,6,Comment#11\n" +
+                               "19,6,Comment#12\n" +
+                               "20,6,Comment#13\n" +
+                               "21,6,Comment#14\n" +
+                               "22,6,Comment#15\n";
+
+               compareResultAsTuples(result, expected);
+       }
+
+       private static class Mapper7 implements MapFunction<Tuple3<Integer, 
Long, String>, Tuple3<Integer, Long, String>> {
+               private static final long serialVersionUID = 1L;
+
+               @Override
+               public Tuple3<Integer, Long, String> map(Tuple3<Integer, Long, 
String> value)
+                               throws Exception {
+                       Integer incr = Integer.valueOf(value.f0.intValue() + 1);
+                       value.setField(incr, 0);
+                       return value;
+               }
+       }
+
+       @Test
+       public void testMapWithBroadcastSet() throws Exception {
+               /*
+                * Test map with broadcast set
+                */
+
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               DataSet<Integer> ints = 
CollectionDataSets.getIntegerDataSet(env);
+
+               DataSet<Tuple3<Integer, Long, String>> ds = 
CollectionDataSets.get3TupleDataSet(env);
+               DataSet<Tuple3<Integer, Long, String>> bcMapDs = ds.
+                               map(new RichMapper1()).withBroadcastSet(ints, 
"ints");
+               List<Tuple3<Integer, Long, String>> result = bcMapDs.collect();
+
+               String expected = "55,1,Hi\n" +
+                               "55,2,Hello\n" +
+                               "55,2,Hello world\n" +
+                               "55,3,Hello world, how are you?\n" +
+                               "55,3,I am fine.\n" +
+                               "55,3,Luke Skywalker\n" +
+                               "55,4,Comment#1\n" +
+                               "55,4,Comment#2\n" +
+                               "55,4,Comment#3\n" +
+                               "55,4,Comment#4\n" +
+                               "55,5,Comment#5\n" +
+                               "55,5,Comment#6\n" +
+                               "55,5,Comment#7\n" +
+                               "55,5,Comment#8\n" +
+                               "55,5,Comment#9\n" +
+                               "55,6,Comment#10\n" +
+                               "55,6,Comment#11\n" +
+                               "55,6,Comment#12\n" +
+                               "55,6,Comment#13\n" +
+                               "55,6,Comment#14\n" +
+                               "55,6,Comment#15\n";
+
+               compareResultAsTuples(result, expected);
+       }
+
+       private static class RichMapper1 extends 
RichMapFunction<Tuple3<Integer, Long, String>,
+       Tuple3<Integer, Long, String>> {
+               private static final long serialVersionUID = 1L;
+               private final Tuple3<Integer, Long, String> out = new 
Tuple3<Integer, Long, String>();
+               private Integer f2Replace = 0;
+
+               @Override
+               public void open(Configuration config) {
+                       Collection<Integer> ints = 
this.getRuntimeContext().getBroadcastVariable("ints");
+                       int sum = 0;
+                       for (Integer i : ints) {
+                               sum += i;
+                       }
+                       f2Replace = sum;
+               }
+
+               @Override
+               public Tuple3<Integer, Long, String> map(Tuple3<Integer, Long, 
String> value)
+                               throws Exception {
+                       out.setFields(f2Replace, value.f1, value.f2);
+                       return out;
+               }
+       }
+
+       static final String TEST_KEY = "testVariable";
+       static final int TEST_VALUE = 666;
+
+       @Test
+       public void testPassingConfigurationObject() throws Exception {
+               /*
+                * Test passing configuration object.
+                */
+
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               DataSet<Tuple3<Integer, Long, String>> ds = 
CollectionDataSets.getSmall3TupleDataSet(env);
+               Configuration conf = new Configuration();
+               conf.setInteger(TEST_KEY, TEST_VALUE);
+               DataSet<Tuple3<Integer, Long, String>> bcMapDs = ds.
+                               map(new RichMapper2()).withParameters(conf);
+               List<Tuple3<Integer, Long, String>> result = bcMapDs.collect();
+
+               String expected = "1,1,Hi\n"
+                               + "2,2,Hello\n"
+                               + "3,2,Hello world";
+
+               compareResultAsTuples(result, expected);
+       }
+
+       private static class RichMapper2 extends 
RichMapFunction<Tuple3<Integer, Long, String>,
+       Tuple3<Integer, Long, String>> {
+               private static final long serialVersionUID = 1L;
+
+               @Override
+               public void open(Configuration config) {
+                       int val = config.getInteger(TEST_KEY, -1);
+                       Assert.assertEquals(TEST_VALUE, val);
+               }
+
+               @Override
+               public Tuple3<Integer, Long, String> map(Tuple3<Integer, Long, 
String> value) {
+                       return value;
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/operators/MapPartitionITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/operators/MapPartitionITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/operators/MapPartitionITCase.java
new file mode 100644
index 0000000..bb7f705
--- /dev/null
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/operators/MapPartitionITCase.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.operators;
+
+import org.apache.flink.api.common.functions.MapPartitionFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.LocalCollectionOutputFormat;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.test.util.JavaProgramTestBase;
+import org.apache.flink.test.util.TestBaseUtils;
+import org.apache.flink.util.Collector;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Integration tests for {@link MapPartitionFunction}.
+ */
+@SuppressWarnings("serial")
+public class MapPartitionITCase extends JavaProgramTestBase {
+
+       private static final String IN = "1 1\n2 2\n2 8\n4 4\n4 4\n6 6\n7 7\n8 
8\n"
+                       + "1 1\n2 2\n2 2\n4 4\n4 4\n6 3\n5 9\n8 8\n1 1\n2 2\n2 
2\n3 0\n4 4\n"
+                       + "5 9\n7 7\n8 8\n1 1\n9 1\n5 9\n4 4\n4 4\n6 6\n7 7\n8 
8\n";
+
+       private static final String RESULT = "1 11\n2 12\n4 14\n4 14\n1 11\n2 
12\n2 12\n4 14\n4 14\n3 16\n1 11\n2 12\n2 12\n0 13\n4 14\n1 11\n4 14\n4 14\n";
+
+       private List<Tuple2<String, String>> input = new ArrayList<>();
+
+       private List<Tuple2<String, Integer>> expected = new ArrayList<>();
+
+       private List<Tuple2<String, Integer>> result = new ArrayList<>();
+
+       @Override
+       protected void preSubmit() throws Exception {
+
+               // create input
+               for (String s :IN.split("\n")) {
+                       String[] fields = s.split(" ");
+                       input.add(new Tuple2<String, String>(fields[0], 
fields[1]));
+               }
+
+               // create expected
+               for (String s : RESULT.split("\n")) {
+                       String[] fields = s.split(" ");
+                       expected.add(new Tuple2<String, Integer>(fields[0], 
Integer.parseInt(fields[1])));
+               }
+
+       }
+
+       @Override
+       protected void postSubmit() {
+               compareResultCollections(expected, result, new 
TestBaseUtils.TupleComparator<Tuple2<String, Integer>>());
+       }
+
+       @Override
+       protected void testProgram() throws Exception {
+               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               DataSet<Tuple2<String, String>> data = 
env.fromCollection(input);
+
+               data.mapPartition(new TestMapPartition()).output(new 
LocalCollectionOutputFormat<Tuple2<String, Integer>>(result));
+
+               env.execute();
+       }
+
+       private static class TestMapPartition implements 
MapPartitionFunction<Tuple2<String, String>, Tuple2<String, Integer>> {
+
+               @Override
+               public void mapPartition(Iterable<Tuple2<String, String>> 
values, Collector<Tuple2<String, Integer>> out) {
+                       for (Tuple2<String, String> value : values) {
+                               String keyString = value.f0;
+                               String valueString = value.f1;
+
+                               int keyInt = Integer.parseInt(keyString);
+                               int valueInt = Integer.parseInt(valueString);
+
+                               if (keyInt + valueInt < 10) {
+                                       out.collect(new Tuple2<String, 
Integer>(valueString, keyInt + 10));
+                               }
+                       }
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/operators/ObjectReuseITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/operators/ObjectReuseITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/operators/ObjectReuseITCase.java
new file mode 100644
index 0000000..37659d2
--- /dev/null
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/operators/ObjectReuseITCase.java
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.operators;
+
+import org.apache.flink.api.common.functions.GroupReduceFunction;
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.apache.flink.util.Collector;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * These check whether the object-reuse execution mode does really reuse 
objects.
+ */
+@SuppressWarnings("serial")
+@RunWith(Parameterized.class)
+public class ObjectReuseITCase extends MultipleProgramsTestBase {
+
+       private static final List<Tuple2<String, Integer>> REDUCE_DATA =
+               Arrays.asList(
+                       new Tuple2<>("a", 1), new Tuple2<>("a", 2),
+                       new Tuple2<>("a", 3), new Tuple2<>("a", 4),
+                       new Tuple2<>("a", 50));
+
+       private static final List<Tuple2<String, Integer>> GROUP_REDUCE_DATA =
+               Arrays.asList(
+                       new Tuple2<>("a", 1), new Tuple2<>("a", 2),
+                       new Tuple2<>("a", 3), new Tuple2<>("a", 4),
+                       new Tuple2<>("a", 5));
+
+       private final boolean objectReuse;
+
+       public ObjectReuseITCase(boolean objectReuse) {
+               super(TestExecutionMode.CLUSTER);
+               this.objectReuse = objectReuse;
+       }
+
+       @Test
+       public void testKeyedReduce() throws Exception {
+
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               if (objectReuse) {
+                       env.getConfig().enableObjectReuse();
+               } else {
+                       env.getConfig().disableObjectReuse();
+               }
+
+               DataSet<Tuple2<String, Integer>> input = 
env.fromCollection(REDUCE_DATA);
+
+               DataSet<Tuple2<String, Integer>> result = input
+                       .groupBy(0)
+                       .reduce(new ReduceFunction<Tuple2<String, Integer>>() {
+
+                               @Override
+                               public Tuple2<String, Integer> 
reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) {
+                                       value2.f1 += value1.f1;
+                                       return value2;
+                               }
+                       });
+
+               Tuple2<String, Integer> res = result.collect().get(0);
+               assertEquals(new Tuple2<>("a", 60), res);
+       }
+
+       @Test
+       public void testGlobalReduce() throws Exception {
+
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               if (objectReuse) {
+                       env.getConfig().enableObjectReuse();
+               } else {
+                       env.getConfig().disableObjectReuse();
+               }
+
+               DataSet<Tuple2<String, Integer>> input = 
env.fromCollection(REDUCE_DATA);
+
+               DataSet<Tuple2<String, Integer>> result = input.reduce(
+                       new ReduceFunction<Tuple2<String, Integer>>() {
+
+                               @Override
+                               public Tuple2<String, Integer> reduce(
+                                               Tuple2<String, Integer> value1,
+                                               Tuple2<String, Integer> value2) 
{
+
+                                       if (value1.f1 % 3 == 0) {
+                                               value1.f1 += value2.f1;
+                                               return value1;
+                                       } else {
+                                               value2.f1 += value1.f1;
+                                               return value2;
+                                       }
+                               }
+
+                       });
+
+               Tuple2<String, Integer> res = result.collect().get(0);
+               assertEquals(new Tuple2<>("a", 60), res);
+       }
+
+       @Test
+       public void testKeyedGroupReduce() throws Exception {
+
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               if (objectReuse) {
+                       env.getConfig().enableObjectReuse();
+               } else {
+                       env.getConfig().disableObjectReuse();
+               }
+
+               DataSet<Tuple2<String, Integer>> input = 
env.fromCollection(GROUP_REDUCE_DATA);
+
+               DataSet<Tuple2<String, Integer>> result = 
input.groupBy(0).reduceGroup(
+                       new GroupReduceFunction<Tuple2<String, Integer>, 
Tuple2<String, Integer>>() {
+
+                               @Override
+                               public void reduce(Iterable<Tuple2<String, 
Integer>> values, Collector<Tuple2<String, Integer>> out) {
+                                       List<Tuple2<String, Integer>> list = 
new ArrayList<>();
+                                       for (Tuple2<String, Integer> val : 
values) {
+                                               list.add(val);
+                                       }
+
+                                       for (Tuple2<String, Integer> val : 
list) {
+                                               out.collect(val);
+                                       }
+                               }
+                       });
+
+               List<Tuple2<String, Integer>> is = result.collect();
+               Collections.sort(is, new TupleComparator<Tuple2<String, 
Integer>>());
+
+               List<Tuple2<String, Integer>> expected = 
env.getConfig().isObjectReuseEnabled() ?
+                       Arrays.asList(new Tuple2<>("a", 4), new Tuple2<>("a", 
4),
+                               new Tuple2<>("a", 5), new Tuple2<>("a", 5), new 
Tuple2<>("a", 5)) :
+                       Arrays.asList(new Tuple2<>("a", 1), new Tuple2<>("a", 
2),
+                               new Tuple2<>("a", 3), new Tuple2<>("a", 4), new 
Tuple2<>("a", 5));
+
+               assertEquals(expected, is);
+       }
+
+       @Test
+       public void testGlobalGroupReduce() throws Exception {
+
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               if (objectReuse) {
+                       env.getConfig().enableObjectReuse();
+               } else {
+                       env.getConfig().disableObjectReuse();
+               }
+
+               DataSet<Tuple2<String, Integer>> input = 
env.fromCollection(GROUP_REDUCE_DATA);
+
+               DataSet<Tuple2<String, Integer>> result = input.reduceGroup(
+                       new GroupReduceFunction<Tuple2<String, Integer>, 
Tuple2<String, Integer>>() {
+
+                               @Override
+                               public void reduce(Iterable<Tuple2<String, 
Integer>> values, Collector<Tuple2<String, Integer>> out) {
+                                       List<Tuple2<String, Integer>> list = 
new ArrayList<>();
+                                       for (Tuple2<String, Integer> val : 
values) {
+                                               list.add(val);
+                                       }
+
+                                       for (Tuple2<String, Integer> val : 
list) {
+                                               out.collect(val);
+                                       }
+                               }
+                       });
+
+               List<Tuple2<String, Integer>> is = result.collect();
+               Collections.sort(is, new TupleComparator<Tuple2<String, 
Integer>>());
+
+               List<Tuple2<String, Integer>> expected = 
env.getConfig().isObjectReuseEnabled() ?
+                       Arrays.asList(new Tuple2<>("a", 4), new Tuple2<>("a", 
4),
+                               new Tuple2<>("a", 5), new Tuple2<>("a", 5), new 
Tuple2<>("a", 5)) :
+                       Arrays.asList(new Tuple2<>("a", 1), new Tuple2<>("a", 
2),
+                               new Tuple2<>("a", 3), new Tuple2<>("a", 4), new 
Tuple2<>("a", 5));
+
+               assertEquals(expected, is);
+       }
+
+       @Parameterized.Parameters(name = "Execution mode = CLUSTER, Reuse = 
{0}")
+       public static Collection<Object[]> executionModes() {
+               return Arrays.asList(
+                       new Object[] { false, },
+                       new Object[] { true });
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/operators/OuterJoinITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/operators/OuterJoinITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/operators/OuterJoinITCase.java
new file mode 100644
index 0000000..31e5062
--- /dev/null
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/operators/OuterJoinITCase.java
@@ -0,0 +1,682 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.operators;
+
+import org.apache.flink.api.common.InvalidProgramException;
+import org.apache.flink.api.common.functions.FlatJoinFunction;
+import org.apache.flink.api.common.functions.JoinFunction;
+import org.apache.flink.api.common.functions.RichFlatJoinFunction;
+import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.tuple.Tuple5;
+import org.apache.flink.api.java.tuple.Tuple7;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.test.operators.util.CollectionDataSets;
+import org.apache.flink.test.operators.util.CollectionDataSets.CustomType;
+import org.apache.flink.test.operators.util.CollectionDataSets.POJO;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.apache.flink.util.Collector;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.Collection;
+import java.util.List;
+
+/**
+ * Integration tests for {@link JoinFunction}, {@link FlatJoinFunction},
+ * and {@link RichFlatJoinFunction}.
+ */
+@SuppressWarnings("serial")
+@RunWith(Parameterized.class)
+public class OuterJoinITCase extends MultipleProgramsTestBase {
+
+       public OuterJoinITCase(TestExecutionMode mode) {
+               super(mode);
+       }
+
+       @Test
+       public void testLeftOuterJoin1() throws Exception {
+               
testLeftOuterJoinOnTuplesWithKeyPositions(JoinHint.REPARTITION_SORT_MERGE);
+       }
+
+       @Test
+       public void testLeftOuterJoin2() throws Exception {
+               
testLeftOuterJoinOnTuplesWithKeyPositions(JoinHint.REPARTITION_HASH_FIRST);
+       }
+
+       @Test
+       public void testLeftOuterJoin3() throws Exception {
+               
testLeftOuterJoinOnTuplesWithKeyPositions(JoinHint.REPARTITION_HASH_SECOND);
+       }
+
+       @Test
+       public void testLeftOuterJoin4() throws Exception {
+               
testLeftOuterJoinOnTuplesWithKeyPositions(JoinHint.BROADCAST_HASH_SECOND);
+       }
+
+       @Test(expected = InvalidProgramException.class)
+       public void testLeftOuterJoin5() throws Exception {
+               
testLeftOuterJoinOnTuplesWithKeyPositions(JoinHint.BROADCAST_HASH_FIRST);
+       }
+
+       private void testLeftOuterJoinOnTuplesWithKeyPositions(JoinHint hint) 
throws Exception {
+               /*
+                * UDF Join on tuples with key field positions
+                */
+
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               DataSet<Tuple3<Integer, Long, String>> ds1 = 
CollectionDataSets.getSmall3TupleDataSet(env);
+               DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = 
CollectionDataSets.getSmall5TupleDataSet(env);
+               DataSet<Tuple2<String, String>> joinDs =
+                               ds1.leftOuterJoin(ds2, hint)
+                                               .where(0)
+                                               .equalTo(0)
+                                               .with(new T3T5FlatJoin());
+
+               List<Tuple2<String, String>> result = joinDs.collect();
+
+               String expected = "Hi,Hallo\n" +
+                               "Hello,Hallo Welt\n" +
+                               "Hello,Hallo Welt wie\n" +
+                               "Hello world,null\n";
+
+               compareResultAsTuples(result, expected);
+       }
+
+       @Test
+       public void testRightOuterJoin1() throws Exception {
+               
testRightOuterJoinOnTuplesWithKeyPositions(JoinHint.REPARTITION_SORT_MERGE);
+       }
+
+       @Test
+       public void testRightOuterJoin2() throws Exception {
+               
testRightOuterJoinOnTuplesWithKeyPositions(JoinHint.REPARTITION_HASH_FIRST);
+       }
+
+       @Test
+       public void testRightOuterJoin3() throws Exception {
+               
testRightOuterJoinOnTuplesWithKeyPositions(JoinHint.REPARTITION_HASH_SECOND);
+       }
+
+       @Test
+       public void testRightOuterJoin4() throws Exception {
+               
testRightOuterJoinOnTuplesWithKeyPositions(JoinHint.BROADCAST_HASH_FIRST);
+       }
+
+       @Test (expected = InvalidProgramException.class)
+       public void testRightOuterJoin5() throws Exception {
+               
testRightOuterJoinOnTuplesWithKeyPositions(JoinHint.BROADCAST_HASH_SECOND);
+       }
+
+       private void testRightOuterJoinOnTuplesWithKeyPositions(JoinHint hint) 
throws Exception {
+               /*
+                * UDF Join on tuples with key field positions
+                */
+
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               DataSet<Tuple3<Integer, Long, String>> ds1 = 
CollectionDataSets.getSmall3TupleDataSet(env);
+               DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = 
CollectionDataSets.getSmall5TupleDataSet(env);
+               DataSet<Tuple2<String, String>> joinDs =
+                               ds1.rightOuterJoin(ds2, hint)
+                                               .where(1)
+                                               .equalTo(1)
+                                               .with(new T3T5FlatJoin());
+
+               List<Tuple2<String, String>> result = joinDs.collect();
+
+               String expected = "Hi,Hallo\n" +
+                               "Hello,Hallo Welt\n" +
+                               "null,Hallo Welt wie\n" +
+                               "Hello world,Hallo Welt\n";
+
+               compareResultAsTuples(result, expected);
+       }
+
+       @Test
+       public void testFullOuterJoin1() throws Exception {
+               
testFullOuterJoinOnTuplesWithKeyPositions(JoinHint.REPARTITION_SORT_MERGE);
+       }
+
+       @Test
+       public void testFullOuterJoin2() throws Exception {
+               
testFullOuterJoinOnTuplesWithKeyPositions(JoinHint.REPARTITION_HASH_FIRST);
+       }
+
+       @Test
+       public void testFullOuterJoin3() throws Exception {
+               
testFullOuterJoinOnTuplesWithKeyPositions(JoinHint.REPARTITION_HASH_SECOND);
+       }
+
+       @Test (expected = InvalidProgramException.class)
+       public void testFullOuterJoin4() throws Exception {
+               
testFullOuterJoinOnTuplesWithKeyPositions(JoinHint.BROADCAST_HASH_FIRST);
+       }
+
+       @Test (expected = InvalidProgramException.class)
+       public void testFullOuterJoin5() throws Exception {
+               
testFullOuterJoinOnTuplesWithKeyPositions(JoinHint.BROADCAST_HASH_SECOND);
+       }
+
+       private void testFullOuterJoinOnTuplesWithKeyPositions(JoinHint hint) 
throws Exception {
+               /*
+                * UDF Join on tuples with key field positions
+                */
+
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               DataSet<Tuple3<Integer, Long, String>> ds1 = 
CollectionDataSets.getSmall3TupleDataSet(env);
+               DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = 
CollectionDataSets.getSmall5TupleDataSet(env);
+               DataSet<Tuple2<String, String>> joinDs =
+                               ds1.fullOuterJoin(ds2, hint)
+                                               .where(0)
+                                               .equalTo(2)
+                                               .with(new T3T5FlatJoin());
+
+               List<Tuple2<String, String>> result = joinDs.collect();
+
+               String expected = "null,Hallo\n" +
+                               "Hi,Hallo Welt\n" +
+                               "Hello,Hallo Welt wie\n" +
+                               "Hello world,null\n";
+
+               compareResultAsTuples(result, expected);
+       }
+
+       @Test
+       public void testJoinOnTuplesWithCompositeKeyPositions() throws 
Exception {
+               /*
+                * UDF Join on tuples with multiple key field positions
+                */
+
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               DataSet<Tuple3<Integer, Long, String>> ds1 = 
CollectionDataSets.getSmall3TupleDataSet(env);
+               DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = 
CollectionDataSets.getSmall5TupleDataSet(env);
+               DataSet<Tuple2<String, String>> joinDs =
+                               ds1.fullOuterJoin(ds2)
+                                               .where(0, 1)
+                                               .equalTo(0, 4)
+                                               .with(new T3T5FlatJoin());
+
+               List<Tuple2<String, String>> result = joinDs.collect();
+
+               String expected = "Hi,Hallo\n" +
+                               "Hello,Hallo Welt\n" +
+                               "Hello world,null\n" +
+                               "null,Hallo Welt wie\n";
+
+               compareResultAsTuples(result, expected);
+       }
+
+       @Test
+       public void testJoinWithBroadcastSet() throws Exception {
+               /*
+                * Join with broadcast set
+                */
+
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               DataSet<Integer> intDs = 
CollectionDataSets.getIntegerDataSet(env);
+
+               DataSet<Tuple3<Integer, Long, String>> ds1 = 
CollectionDataSets.getSmall3TupleDataSet(env);
+               DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = 
CollectionDataSets.getSmall5TupleDataSet(env);
+               DataSet<Tuple3<String, String, Integer>> joinDs =
+                               ds1.fullOuterJoin(ds2)
+                                               .where(1)
+                                               .equalTo(4)
+                                               .with(new T3T5BCJoin())
+                                               .withBroadcastSet(intDs, 
"ints");
+
+               List<Tuple3<String, String, Integer>> result = joinDs.collect();
+
+               String expected = "Hi,Hallo,55\n" +
+                               "Hi,Hallo Welt wie,55\n" +
+                               "Hello,Hallo Welt,55\n" +
+                               "Hello world,Hallo Welt,55\n";
+
+               compareResultAsTuples(result, expected);
+       }
+
+       @Test
+       public void testJoinWithMixedKeyTypes1() throws Exception {
+               /*
+                * Join on a tuple input with key field selector and a custom 
type input with key extractor
+                */
+
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               DataSet<CustomType> ds1 = 
CollectionDataSets.getSmallCustomTypeDataSet(env);
+               DataSet<Tuple3<Integer, Long, String>> ds2 = 
CollectionDataSets.getSmall3TupleDataSet(env);
+               DataSet<Tuple2<String, String>> joinDs =
+                               ds1.fullOuterJoin(ds2)
+                                               .where(new KeySelector1())
+                                               .equalTo(0)
+                                               .with(new CustT3Join());
+
+               List<Tuple2<String, String>> result = joinDs.collect();
+
+               String expected = "Hi,Hi\n" +
+                               "Hello,Hello\n" +
+                               "Hello world,Hello\n" +
+                               "null,Hello world\n";
+
+               compareResultAsTuples(result, expected);
+
+       }
+
+       private static class KeySelector1 implements KeySelector<CustomType, 
Integer> {
+               @Override
+               public Integer getKey(CustomType value) {
+                       return value.myInt;
+               }
+       }
+
+       @Test
+       public void testJoinWithMixedKeyTypes2()
+                       throws Exception {
+               /*
+                * Join on a tuple input with key field selector and a custom 
type input with key extractor
+                */
+
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               DataSet<Tuple3<Integer, Long, String>> ds1 = 
CollectionDataSets.getSmall3TupleDataSet(env);
+               DataSet<CustomType> ds2 = 
CollectionDataSets.getSmallCustomTypeDataSet(env);
+               DataSet<Tuple2<String, String>> joinDs =
+                               ds1.fullOuterJoin(ds2)
+                                               .where(1)
+                                               .equalTo(new KeySelector2())
+                                               .with(new T3CustJoin());
+
+               List<Tuple2<String, String>> result = joinDs.collect();
+
+               String expected = "null,Hi\n" +
+                               "Hi,Hello\n" +
+                               "Hello,Hello world\n" +
+                               "Hello world,Hello world\n";
+
+               compareResultAsTuples(result, expected);
+       }
+
+       private static class KeySelector2 implements KeySelector<CustomType, 
Long> {
+               @Override
+               public Long getKey(CustomType value) {
+                       return value.myLong;
+               }
+       }
+
+       @Test
+       public void testJoinWithTupleReturningKeySelectors() throws Exception {
+               /*
+                * UDF Join on tuples with tuple-returning key selectors
+                */
+
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               DataSet<Tuple3<Integer, Long, String>> ds1 = 
CollectionDataSets.getSmall3TupleDataSet(env);
+               DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = 
CollectionDataSets.getSmall5TupleDataSet(env);
+               DataSet<Tuple2<String, String>> joinDs =
+                               ds1.fullOuterJoin(ds2)
+                                               .where(new KeySelector3()) //0, 
1
+                                               .equalTo(new KeySelector4()) // 
0, 4
+                                               .with(new T3T5FlatJoin());
+
+               List<Tuple2<String, String>> result = joinDs.collect();
+
+               String expected = "Hi,Hallo\n" +
+                               "Hello,Hallo Welt\n" +
+                               "Hello world,null\n" +
+                               "null,Hallo Welt wie\n";
+
+               compareResultAsTuples(result, expected);
+       }
+
+       private static class KeySelector3 implements 
KeySelector<Tuple3<Integer, Long, String>, Tuple2<Integer, Long>> {
+               private static final long serialVersionUID = 1L;
+
+               @Override
+               public Tuple2<Integer, Long> getKey(Tuple3<Integer, Long, 
String> t) {
+                       return new Tuple2<>(t.f0, t.f1);
+               }
+       }
+
+       private static class KeySelector4 implements 
KeySelector<Tuple5<Integer, Long, Integer, String, Long>, Tuple2<Integer, 
Long>> {
+               private static final long serialVersionUID = 1L;
+
+               @Override
+               public Tuple2<Integer, Long> getKey(Tuple5<Integer, Long, 
Integer, String, Long> t) {
+                       return new Tuple2<>(t.f0, t.f4);
+               }
+       }
+
+       @Test
+       public void testJoinWithNestedKeyExpression1() throws Exception {
+               /*
+                * Join nested pojo against tuple (selected using a string)
+                */
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               DataSet<POJO> ds1 = CollectionDataSets.getSmallPojoDataSet(env);
+               DataSet<Tuple7<Integer, String, Integer, Integer, Long, String, 
Long>> ds2 = CollectionDataSets.getSmallTuplebasedDataSet(env);
+               DataSet<Tuple2<POJO, Tuple7<Integer, String, Integer, Integer, 
Long, String, Long>>> joinDs =
+                               ds1.fullOuterJoin(ds2)
+                                               .where("nestedPojo.longNumber")
+                                               .equalTo("f6")
+                                               .with(new 
ProjectBothFunction<POJO, Tuple7<Integer, String, Integer, Integer, Long, 
String, Long>>());
+
+               List<Tuple2<POJO, Tuple7<Integer, String, Integer, Integer, 
Long, String, Long>>> result = joinDs.collect();
+
+               String expected = "1 First (10,100,1000,One) 
10000,(1,First,10,100,1000,One,10000)\n" +
+                               "2 Second (20,200,2000,Two) 
20000,(2,Second,20,200,2000,Two,20000)\n" +
+                               "3 Third (30,300,3000,Three) 
30000,(3,Third,30,300,3000,Three,30000)\n";
+
+               compareResultAsTuples(result, expected);
+       }
+
+       @Test
+       public void testJoinWithNestedKeyExpression2() throws Exception {
+               /*
+                * Join nested pojo against tuple (selected as an integer)
+                */
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               DataSet<POJO> ds1 = CollectionDataSets.getSmallPojoDataSet(env);
+               DataSet<Tuple7<Integer, String, Integer, Integer, Long, String, 
Long>> ds2 = CollectionDataSets.getSmallTuplebasedDataSet(env);
+               DataSet<Tuple2<POJO, Tuple7<Integer, String, Integer, Integer, 
Long, String, Long>>> joinDs =
+                               ds1.fullOuterJoin(ds2)
+                                               .where("nestedPojo.longNumber")
+                                               .equalTo(6) // <--- difference!
+                                               .with(new 
ProjectBothFunction<POJO, Tuple7<Integer, String, Integer, Integer, Long, 
String, Long>>());
+
+               List<Tuple2<POJO, Tuple7<Integer, String, Integer, Integer, 
Long, String, Long>>> result = joinDs.collect();
+
+               String expected = "1 First (10,100,1000,One) 
10000,(1,First,10,100,1000,One,10000)\n" +
+                               "2 Second (20,200,2000,Two) 
20000,(2,Second,20,200,2000,Two,20000)\n" +
+                               "3 Third (30,300,3000,Three) 
30000,(3,Third,30,300,3000,Three,30000)\n";
+
+               compareResultAsTuples(result, expected);
+       }
+
+       @Test
+       public void testJoinWithCompositeKeyExpressions() throws Exception {
+               /*
+                * selecting multiple fields using expression language
+                */
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               DataSet<POJO> ds1 = CollectionDataSets.getSmallPojoDataSet(env);
+               DataSet<Tuple7<Integer, String, Integer, Integer, Long, String, 
Long>> ds2 = CollectionDataSets.getSmallTuplebasedDataSet(env);
+               DataSet<Tuple2<POJO, Tuple7<Integer, String, Integer, Integer, 
Long, String, Long>>> joinDs =
+                               ds1.fullOuterJoin(ds2)
+                                               .where("nestedPojo.longNumber", 
"number", "str")
+                                               .equalTo("f6", "f0", "f1")
+                                               .with(new 
ProjectBothFunction<POJO, Tuple7<Integer, String, Integer, Integer, Long, 
String, Long>>());
+
+               env.setParallelism(1);
+               List<Tuple2<POJO, Tuple7<Integer, String, Integer, Integer, 
Long, String, Long>>> result = joinDs.collect();
+
+               String expected = "1 First (10,100,1000,One) 
10000,(1,First,10,100,1000,One,10000)\n" +
+                               "2 Second (20,200,2000,Two) 
20000,(2,Second,20,200,2000,Two,20000)\n" +
+                               "3 Third (30,300,3000,Three) 
30000,(3,Third,30,300,3000,Three,30000)\n";
+
+               compareResultAsTuples(result, expected);
+       }
+
+       @Test
+       public void testNestedIntoTuple() throws Exception {
+               /*
+                * nested into tuple
+                */
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               DataSet<POJO> ds1 = CollectionDataSets.getSmallPojoDataSet(env);
+               DataSet<Tuple7<Integer, String, Integer, Integer, Long, String, 
Long>> ds2 = CollectionDataSets.getSmallTuplebasedDataSet(env);
+               DataSet<Tuple2<POJO, Tuple7<Integer, String, Integer, Integer, 
Long, String, Long>>> joinDs =
+                               ds1.fullOuterJoin(ds2)
+                                               .where("nestedPojo.longNumber", 
"number", "nestedTupleWithCustom.f0")
+                                               .equalTo("f6", "f0", "f2")
+                                               .with(new 
ProjectBothFunction<POJO, Tuple7<Integer, String, Integer, Integer, Long, 
String, Long>>());
+
+               env.setParallelism(1);
+               List<Tuple2<POJO, Tuple7<Integer, String, Integer, Integer, 
Long, String, Long>>> result = joinDs.collect();
+
+               String expected = "1 First (10,100,1000,One) 
10000,(1,First,10,100,1000,One,10000)\n" +
+                               "2 Second (20,200,2000,Two) 
20000,(2,Second,20,200,2000,Two,20000)\n" +
+                               "3 Third (30,300,3000,Three) 
30000,(3,Third,30,300,3000,Three,30000)\n";
+
+               compareResultAsTuples(result, expected);
+       }
+
+       @Test
+       public void testNestedIntoTupleIntoPojo() throws Exception {
+               /*
+                * nested into tuple into pojo
+                */
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               DataSet<POJO> ds1 = CollectionDataSets.getSmallPojoDataSet(env);
+               DataSet<Tuple7<Integer, String, Integer, Integer, Long, String, 
Long>> ds2 = CollectionDataSets.getSmallTuplebasedDataSet(env);
+               DataSet<Tuple2<POJO, Tuple7<Integer, String, Integer, Integer, 
Long, String, Long>>> joinDs =
+                               ds1.fullOuterJoin(ds2)
+                                               
.where("nestedTupleWithCustom.f0", "nestedTupleWithCustom.f1.myInt", 
"nestedTupleWithCustom.f1.myLong")
+                                               .equalTo("f2", "f3", "f4")
+                                               .with(new 
ProjectBothFunction<POJO, Tuple7<Integer, String, Integer, Integer, Long, 
String, Long>>());
+
+               env.setParallelism(1);
+               List<Tuple2<POJO, Tuple7<Integer, String, Integer, Integer, 
Long, String, Long>>> result = joinDs.collect();
+
+               String expected = "1 First (10,100,1000,One) 
10000,(1,First,10,100,1000,One,10000)\n" +
+                               "2 Second (20,200,2000,Two) 
20000,(2,Second,20,200,2000,Two,20000)\n" +
+                               "3 Third (30,300,3000,Three) 
30000,(3,Third,30,300,3000,Three,30000)\n";
+
+               compareResultAsTuples(result, expected);
+       }
+
+       @Test
+       public void testNonPojoToVerifyFullTupleKeys() throws Exception {
+               /*
+                * Non-POJO test to verify that full-tuple keys are working.
+                */
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               DataSet<Tuple2<Tuple2<Integer, Integer>, String>> ds1 = 
CollectionDataSets.getSmallNestedTupleDataSet(env);
+               DataSet<Tuple2<Tuple2<Integer, Integer>, String>> ds2 = 
CollectionDataSets.getSmallNestedTupleDataSet(env);
+               DataSet<Tuple2<Tuple2<Tuple2<Integer, Integer>, String>, 
Tuple2<Tuple2<Integer, Integer>, String>>> joinDs =
+                               ds1.fullOuterJoin(ds2)
+                                               .where(0)
+                                               .equalTo("f0.f0", "f0.f1") // 
key is now Tuple2<Integer, Integer>
+                                               .with(new 
ProjectBothFunction<Tuple2<Tuple2<Integer, Integer>, String>, 
Tuple2<Tuple2<Integer, Integer>, String>>());
+
+               env.setParallelism(1);
+               List<Tuple2<Tuple2<Tuple2<Integer, Integer>, String>, 
Tuple2<Tuple2<Integer, Integer>, String>>> result = joinDs.collect();
+
+               String expected = "((1,1),one),((1,1),one)\n" +
+                               "((2,2),two),((2,2),two)\n" +
+                               "((3,3),three),((3,3),three)\n";
+
+               compareResultAsTuples(result, expected);
+
+       }
+
+       @Test
+       public void testNonPojoToVerifyNestedTupleElementSelection() throws 
Exception {
+               /*
+                * Non-POJO test to verify "nested" tuple-element selection.
+                */
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               DataSet<Tuple2<Tuple2<Integer, Integer>, String>> ds1 = 
CollectionDataSets.getSmallNestedTupleDataSet(env);
+               DataSet<Tuple2<Tuple2<Integer, Integer>, String>> ds2 = 
CollectionDataSets.getSmallNestedTupleDataSet(env);
+               DataSet<Tuple2<Tuple2<Tuple2<Integer, Integer>, String>, 
Tuple2<Tuple2<Integer, Integer>, String>>> joinDs =
+                               ds1.fullOuterJoin(ds2)
+                                               .where("f0.f0")
+                                               .equalTo("f0.f0") // key is now 
Integer from Tuple2<Integer, Integer>
+                                               .with(new 
ProjectBothFunction<Tuple2<Tuple2<Integer, Integer>, String>, 
Tuple2<Tuple2<Integer, Integer>, String>>());
+
+               env.setParallelism(1);
+               List<Tuple2<Tuple2<Tuple2<Integer, Integer>, String>, 
Tuple2<Tuple2<Integer, Integer>, String>>> result = joinDs.collect();
+
+               String expected = "((1,1),one),((1,1),one)\n" +
+                               "((2,2),two),((2,2),two)\n" +
+                               "((3,3),three),((3,3),three)\n";
+
+               compareResultAsTuples(result, expected);
+       }
+
+       @Test
+       public void testFullPojoWithFullTuple() throws Exception {
+               /*
+                * full pojo with full tuple
+                */
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               DataSet<POJO> ds1 = CollectionDataSets.getSmallPojoDataSet(env);
+               DataSet<Tuple7<Long, Integer, Integer, Long, String, Integer, 
String>> ds2 = CollectionDataSets.getSmallTuplebasedDataSetMatchingPojo(env);
+               DataSet<Tuple2<POJO, Tuple7<Long, Integer, Integer, Long, 
String, Integer, String>>> joinDs =
+                               ds1.fullOuterJoin(ds2)
+                                               .where("*")
+                                               .equalTo("*")
+                                               .with(new 
ProjectBothFunction<POJO, Tuple7<Long, Integer, Integer, Long, String, Integer, 
String>>());
+
+               env.setParallelism(1);
+               List<Tuple2<POJO, Tuple7<Long, Integer, Integer, Long, String, 
Integer, String>>> result = joinDs.collect();
+
+               String expected = "1 First (10,100,1000,One) 
10000,(10000,10,100,1000,One,1,First)\n" +
+                               "2 Second (20,200,2000,Two) 
20000,(20000,20,200,2000,Two,2,Second)\n" +
+                               "3 Third (30,300,3000,Three) 
30000,(30000,30,300,3000,Three,3,Third)\n";
+
+               compareResultAsTuples(result, expected);
+       }
+
+       @Test
+       public void testJoinWithAtomicType1() throws Exception {
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               DataSet<Tuple3<Integer, Long, String>> ds1 = 
CollectionDataSets.getSmall3TupleDataSet(env);
+               DataSet<Integer> ds2 = env.fromElements(1, 2);
+
+               DataSet<Tuple2<Tuple3<Integer, Long, String>, Integer>> joinDs 
= ds1
+                               .fullOuterJoin(ds2)
+                               .where(0)
+                               .equalTo("*")
+                               .with(new ProjectBothFunction<Tuple3<Integer, 
Long, String>, Integer>())
+                               
.returns("Tuple2<java.lang.Object,java.lang.Object>");
+
+               List<Tuple2<Tuple3<Integer, Long, String>, Integer>> result = 
joinDs.collect();
+
+               String expected = "(1,1,Hi),1\n" +
+                               "(2,2,Hello),2\n" +
+                               "(3,2,Hello world),null\n";
+
+               compareResultAsTuples(result, expected);
+       }
+
+       @Test
+       public void testJoinWithAtomicType2() throws Exception {
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               DataSet<Integer> ds1 = env.fromElements(1, 2);
+               DataSet<Tuple3<Integer, Long, String>> ds2 = 
CollectionDataSets.getSmall3TupleDataSet(env);
+
+               DataSet<Tuple2<Integer, Tuple3<Integer, Long, String>>> joinDs 
= ds1
+                               .fullOuterJoin(ds2)
+                               .where("*")
+                               .equalTo(0)
+                               .with(new ProjectBothFunction<Integer, 
Tuple3<Integer, Long, String>>())
+                               
.returns("Tuple2<java.lang.Object,java.lang.Object>");
+
+               List<Tuple2<Integer, Tuple3<Integer, Long, String>>> result = 
joinDs.collect();
+
+               String expected = "1,(1,1,Hi)\n" +
+                               "2,(2,2,Hello)\n" +
+                               "null,(3,2,Hello world)\n";
+
+               compareResultAsTuples(result, expected);
+       }
+
+       private static class T3T5FlatJoin implements 
FlatJoinFunction<Tuple3<Integer, Long, String>, Tuple5<Integer, Long, Integer, 
String, Long>, Tuple2<String, String>> {
+
+               @Override
+               public void join(Tuple3<Integer, Long, String> first,
+                               Tuple5<Integer, Long, Integer, String, Long> 
second,
+                               Collector<Tuple2<String, String>> out) {
+
+                       out.collect(new Tuple2<>(first == null ? null : 
first.f2, second == null ? null : second.f3));
+               }
+
+       }
+
+       private static class T3T5BCJoin extends 
RichFlatJoinFunction<Tuple3<Integer, Long, String>, Tuple5<Integer, Long, 
Integer, String, Long>, Tuple3<String, String, Integer>> {
+
+               private int broadcast;
+
+               @Override
+               public void open(Configuration config) {
+                       Collection<Integer> ints = 
this.getRuntimeContext().getBroadcastVariable("ints");
+                       int sum = 0;
+                       for (Integer i : ints) {
+                               sum += i;
+                       }
+                       broadcast = sum;
+               }
+
+               @Override
+               public void join(Tuple3<Integer, Long, String> first, 
Tuple5<Integer, Long, Integer, String, Long> second,
+                               Collector<Tuple3<String, String, Integer>> out) 
throws Exception {
+                       out.collect(new Tuple3<>(first == null ? null : 
first.f2, second == null ? null : second.f3, broadcast));
+               }
+       }
+
+       private static class T3CustJoin implements JoinFunction<Tuple3<Integer, 
Long, String>, CustomType, Tuple2<String, String>> {
+
+               @Override
+               public Tuple2<String, String> join(Tuple3<Integer, Long, 
String> first,
+                               CustomType second) {
+
+                       return new Tuple2<>(first == null ? null : first.f2, 
second == null ? null : second.myString);
+               }
+       }
+
+       private static class CustT3Join implements JoinFunction<CustomType, 
Tuple3<Integer, Long, String>, Tuple2<String, String>> {
+
+               @Override
+               public Tuple2<String, String> join(CustomType first, 
Tuple3<Integer, Long, String> second) {
+
+                       return new Tuple2<>(first == null ? null : 
first.myString, second == null ? null : second.f2);
+               }
+       }
+
+       /**
+        * Deliberately untyped join function, which emits a Tuple2 of the left 
and right side.
+        */
+       private static class ProjectBothFunction<IN1, IN2> implements 
JoinFunction<IN1, IN2, Tuple2<IN1, IN2>> {
+               @Override
+               public Tuple2<IN1, IN2> join(IN1 first, IN2 second) throws 
Exception {
+                       return new Tuple2<>(first, second);
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/operators/PartitionITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/operators/PartitionITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/operators/PartitionITCase.java
new file mode 100644
index 0000000..5e88430
--- /dev/null
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/operators/PartitionITCase.java
@@ -0,0 +1,847 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.operators;
+
+import org.apache.flink.api.common.InvalidProgramException;
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.MapPartitionFunction;
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.common.operators.Order;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.operators.AggregateOperator;
+import org.apache.flink.api.java.operators.DataSource;
+import org.apache.flink.api.java.operators.DeltaIteration;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.test.operators.util.CollectionDataSets;
+import org.apache.flink.test.operators.util.CollectionDataSets.POJO;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.apache.flink.util.Collector;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Integration tests for {@link MapPartitionFunction}.
+ */
+@RunWith(Parameterized.class)
+@SuppressWarnings("serial")
+public class PartitionITCase extends MultipleProgramsTestBase {
+
+       public PartitionITCase(TestExecutionMode mode){
+               super(mode);
+       }
+
+       @Test
+       public void testHashPartitionByKeyField() throws Exception {
+               /*
+                * Test hash partition by key field
+                */
+
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               DataSet<Tuple3<Integer, Long, String>> ds = 
CollectionDataSets.get3TupleDataSet(env);
+               DataSet<Long> uniqLongs = ds
+                               .partitionByHash(1)
+                               .mapPartition(new UniqueTupleLongMapper());
+               List<Long> result = uniqLongs.collect();
+
+               String expected = "1\n" +
+                               "2\n" +
+                               "3\n" +
+                               "4\n" +
+                               "5\n" +
+                               "6\n";
+
+               compareResultAsText(result, expected);
+       }
+
+       @Test
+       public void testRangePartitionByKeyField() throws Exception {
+               /*
+                * Test range partition by key field
+                */
+
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               DataSet<Tuple3<Integer, Long, String>> ds = 
CollectionDataSets.get3TupleDataSet(env);
+               DataSet<Long> uniqLongs = ds
+                       .partitionByRange(1)
+                       .mapPartition(new UniqueTupleLongMapper());
+               List<Long> result = uniqLongs.collect();
+
+               String expected = "1\n" +
+                       "2\n" +
+                       "3\n" +
+                       "4\n" +
+                       "5\n" +
+                       "6\n";
+
+               compareResultAsText(result, expected);
+       }
+
+       @Test
+       public void testHashPartitionByKeyField2() throws Exception {
+               /*
+                * Test hash partition by key field
+                */
+
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               DataSet<Tuple3<Integer, Long, String>> ds = 
CollectionDataSets.get3TupleDataSet(env);
+               AggregateOperator<Tuple3<Integer, Long, String>> sum = ds
+                       .map(new PrefixMapper())
+                       .partitionByHash(1, 2)
+                       .groupBy(1, 2)
+                       .sum(0);
+
+               List<Tuple3<Integer, Long, String>> result = sum.collect();
+
+               String expected = "(1,1,Hi)\n" +
+                       "(5,2,Hello)\n" +
+                       "(4,3,Hello)\n" +
+                       "(5,3,I am )\n" +
+                       "(6,3,Luke )\n" +
+                       "(34,4,Comme)\n" +
+                       "(65,5,Comme)\n" +
+                       "(111,6,Comme)";
+
+               compareResultAsText(result, expected);
+       }
+
+       @Test
+       public void testRangePartitionByKeyField2() throws Exception {
+               /*
+                * Test range partition by key field
+                */
+
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               DataSet<Tuple3<Integer, Long, String>> ds = 
CollectionDataSets.get3TupleDataSet(env);
+               AggregateOperator<Tuple3<Integer, Long, String>> sum = ds
+                       .map(new PrefixMapper())
+                       .partitionByRange(1, 2)
+                       .groupBy(1, 2)
+                       .sum(0);
+
+               List<Tuple3<Integer, Long, String>> result = sum.collect();
+
+               String expected = "(1,1,Hi)\n" +
+               "(5,2,Hello)\n" +
+               "(4,3,Hello)\n" +
+               "(5,3,I am )\n" +
+               "(6,3,Luke )\n" +
+               "(34,4,Comme)\n" +
+               "(65,5,Comme)\n" +
+               "(111,6,Comme)";
+
+               compareResultAsText(result, expected);
+       }
+
+       @Test
+       public void testHashPartitionOfAtomicType() throws Exception {
+               /*
+                * Test hash partition of atomic type
+                */
+
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               DataSet<Long> uniqLongs = env.generateSequence(1, 6)
+                       .union(env.generateSequence(1, 6))
+                       .rebalance()
+                       .partitionByHash("*")
+                       .mapPartition(new UniqueLongMapper());
+               List<Long> result = uniqLongs.collect();
+
+               String expected = "1\n" +
+                       "2\n" +
+                       "3\n" +
+                       "4\n" +
+                       "5\n" +
+                       "6\n";
+
+               compareResultAsText(result, expected);
+       }
+
+       @Test
+       public void testRangePartitionOfAtomicType() throws Exception {
+               /*
+                * Test range partition of atomic type
+                */
+
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               DataSet<Long> uniqLongs = env.generateSequence(1, 6)
+                       .union(env.generateSequence(1, 6))
+                       .rebalance()
+                       .partitionByRange("*")
+                       .mapPartition(new UniqueLongMapper());
+               List<Long> result = uniqLongs.collect();
+
+               String expected = "1\n" +
+                       "2\n" +
+                       "3\n" +
+                       "4\n" +
+                       "5\n" +
+                       "6\n";
+
+               compareResultAsText(result, expected);
+       }
+
+       @Test
+       public void testHashPartitionByKeySelector() throws Exception {
+               /*
+                * Test hash partition by key selector
+                */
+
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               DataSet<Tuple3<Integer, Long, String>> ds = 
CollectionDataSets.get3TupleDataSet(env);
+               DataSet<Long> uniqLongs = ds
+                               .partitionByHash(new KeySelector1())
+                               .mapPartition(new UniqueTupleLongMapper());
+               List<Long> result = uniqLongs.collect();
+
+               String expected = "1\n" +
+                               "2\n" +
+                               "3\n" +
+                               "4\n" +
+                               "5\n" +
+                               "6\n";
+
+               compareResultAsText(result, expected);
+       }
+
+       private static class PrefixMapper implements 
MapFunction<Tuple3<Integer, Long, String>, Tuple3<Integer, Long, String>> {
+               @Override
+               public Tuple3<Integer, Long, String> map(Tuple3<Integer, Long, 
String> value) throws Exception {
+                       if (value.f2.length() > 5) {
+                               value.f2 = value.f2.substring(0, 5);
+                       }
+                       return value;
+               }
+       }
+
+       @Test
+       public void testRangePartitionByKeySelector() throws Exception {
+               /*
+                * Test range partition by key selector
+                */
+
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               DataSet<Tuple3<Integer, Long, String>> ds = 
CollectionDataSets.get3TupleDataSet(env);
+               DataSet<Long> uniqLongs = ds
+                       .partitionByRange(new KeySelector1())
+                       .mapPartition(new UniqueTupleLongMapper());
+               List<Long> result = uniqLongs.collect();
+
+               String expected = "1\n" +
+                       "2\n" +
+                       "3\n" +
+                       "4\n" +
+                       "5\n" +
+                       "6\n";
+
+               compareResultAsText(result, expected);
+       }
+
+       private static class KeySelector1 implements 
KeySelector<Tuple3<Integer, Long, String>, Long> {
+               private static final long serialVersionUID = 1L;
+
+               @Override
+               public Long getKey(Tuple3<Integer, Long, String> value) throws 
Exception {
+                       return value.f1;
+               }
+
+       }
+
+       @Test
+       public void testForcedRebalancing() throws Exception {
+               /*
+                * Test forced rebalancing
+                */
+
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               // generate some number in parallel
+               DataSet<Long> ds = env.generateSequence(1, 3000);
+               DataSet<Tuple2<Integer, Integer>> uniqLongs = ds
+                               // introduce some partition skew by filtering
+                               .filter(new Filter1())
+                               // rebalance
+                               .rebalance()
+                               // count values in each partition
+                               .map(new PartitionIndexMapper())
+                               .groupBy(0)
+                               .reduce(new Reducer1())
+                               // round counts to mitigate runtime scheduling 
effects (lazy split assignment)
+                               .map(new Mapper1());
+
+               List<Tuple2<Integer, Integer>> result = uniqLongs.collect();
+
+               StringBuilder expected = new StringBuilder();
+               int numPerPartition = 2220 / env.getParallelism() / 10;
+               for (int i = 0; i < env.getParallelism(); i++) {
+                       expected.append('(').append(i).append(',')
+                       .append(numPerPartition).append(")\n");
+               }
+
+               compareResultAsText(result, expected.toString());
+       }
+
+       private static class Filter1 implements FilterFunction<Long> {
+               private static final long serialVersionUID = 1L;
+
+               @Override
+               public boolean filter(Long value) throws Exception {
+                       return value > 780;
+               }
+       }
+
+       private static class Reducer1 implements ReduceFunction<Tuple2<Integer, 
Integer>> {
+               private static final long serialVersionUID = 1L;
+
+               @Override
+               public Tuple2<Integer, Integer> reduce(Tuple2<Integer, Integer> 
v1, Tuple2<Integer, Integer> v2) {
+                       return new Tuple2<>(v1.f0, v1.f1 + v2.f1);
+               }
+       }
+
+       private static class Mapper1 implements MapFunction<Tuple2<Integer, 
Integer>, Tuple2<Integer,
+       Integer>>{
+               private static final long serialVersionUID = 1L;
+
+               @Override
+               public Tuple2<Integer, Integer> map(Tuple2<Integer, Integer> 
value) throws Exception {
+                       value.f1 = (value.f1 / 10);
+                       return value;
+               }
+
+       }
+
+       @Test
+       public void testHashPartitionByKeyFieldAndDifferentParallelism() throws 
Exception {
+               /*
+                * Test hash partition by key field and different parallelism
+                */
+
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               env.setParallelism(3);
+
+               DataSet<Tuple3<Integer, Long, String>> ds = 
CollectionDataSets.get3TupleDataSet(env);
+               DataSet<Long> uniqLongs = ds
+                               .partitionByHash(1).setParallelism(4)
+                               .mapPartition(new UniqueTupleLongMapper());
+               List<Long> result = uniqLongs.collect();
+
+               String expected = "1\n" +
+                               "2\n" +
+                               "3\n" +
+                               "4\n" +
+                               "5\n" +
+                               "6\n";
+
+               compareResultAsText(result, expected);
+       }
+
+       @Test
+       public void testRangePartitionByKeyFieldAndDifferentParallelism() 
throws Exception {
+               /*
+                * Test range partition by key field and different parallelism
+                */
+
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               env.setParallelism(3);
+
+               DataSet<Tuple3<Integer, Long, String>> ds = 
CollectionDataSets.get3TupleDataSet(env);
+               DataSet<Long> uniqLongs = ds
+                       .partitionByRange(1).setParallelism(4)
+                       .mapPartition(new UniqueTupleLongMapper());
+               List<Long> result = uniqLongs.collect();
+
+               String expected = "1\n" +
+                       "2\n" +
+                       "3\n" +
+                       "4\n" +
+                       "5\n" +
+                       "6\n";
+
+               compareResultAsText(result, expected);
+       }
+
+       @Test
+       public void testHashPartitionWithKeyExpression() throws Exception {
+               /*
+                * Test hash partition with key expression
+                */
+
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               env.setParallelism(3);
+
+               DataSet<POJO> ds = 
CollectionDataSets.getDuplicatePojoDataSet(env);
+               DataSet<Long> uniqLongs = ds
+                               
.partitionByHash("nestedPojo.longNumber").setParallelism(4)
+                               .mapPartition(new UniqueNestedPojoLongMapper());
+               List<Long> result = uniqLongs.collect();
+
+               String expected = "10000\n" +
+                               "20000\n" +
+                               "30000\n";
+
+               compareResultAsText(result, expected);
+       }
+
+       @Test
+       public void testRangePartitionWithKeyExpression() throws Exception {
+               /*
+                * Test range partition with key expression
+                */
+
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               env.setParallelism(3);
+
+               DataSet<POJO> ds = 
CollectionDataSets.getDuplicatePojoDataSet(env);
+               DataSet<Long> uniqLongs = ds
+                       
.partitionByRange("nestedPojo.longNumber").setParallelism(4)
+                       .mapPartition(new UniqueNestedPojoLongMapper());
+               List<Long> result = uniqLongs.collect();
+
+               String expected = "10000\n" +
+                       "20000\n" +
+                       "30000\n";
+
+               compareResultAsText(result, expected);
+       }
+
+       private static class UniqueTupleLongMapper implements 
MapPartitionFunction<Tuple3<Integer, Long, String>, Long> {
+               private static final long serialVersionUID = 1L;
+
+               @Override
+               public void mapPartition(Iterable<Tuple3<Integer, Long, 
String>> records, Collector<Long> out) throws Exception {
+                       HashSet<Long> uniq = new HashSet<>();
+                       for (Tuple3<Integer, Long, String> t : records) {
+                               uniq.add(t.f1);
+                       }
+                       for (Long l : uniq) {
+                               out.collect(l);
+                       }
+               }
+       }
+
+       private static class UniqueLongMapper implements 
MapPartitionFunction<Long, Long> {
+               private static final long serialVersionUID = 1L;
+
+               @Override
+               public void mapPartition(Iterable<Long> longs, Collector<Long> 
out) throws Exception {
+                       HashSet<Long> uniq = new HashSet<>();
+                       for (Long l : longs) {
+                               uniq.add(l);
+                       }
+                       for (Long l : uniq) {
+                               out.collect(l);
+                       }
+               }
+       }
+
+       private static class UniqueNestedPojoLongMapper implements 
MapPartitionFunction<POJO, Long> {
+               private static final long serialVersionUID = 1L;
+
+               @Override
+               public void mapPartition(Iterable<POJO> records, 
Collector<Long> out) throws Exception {
+                       HashSet<Long> uniq = new HashSet<>();
+                       for (POJO t : records) {
+                               uniq.add(t.nestedPojo.longNumber);
+                       }
+                       for (Long l : uniq) {
+                               out.collect(l);
+                       }
+               }
+       }
+
+       private static class PartitionIndexMapper extends RichMapFunction<Long, 
Tuple2<Integer, Integer>> {
+               private static final long serialVersionUID = 1L;
+
+               @Override
+               public Tuple2<Integer, Integer> map(Long value) throws 
Exception {
+                       return new 
Tuple2<>(this.getRuntimeContext().getIndexOfThisSubtask(), 1);
+               }
+       }
+
+       @Test
+       public void testRangePartitionerOnSequenceData() throws Exception {
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               DataSource<Long> dataSource = env.generateSequence(0, 10000);
+               KeySelector<Long, Long> keyExtractor = new 
ObjectSelfKeySelector();
+
+               MapPartitionFunction<Long, Tuple2<Long, Long>> minMaxSelector = 
new MinMaxSelector<>(new LongComparator(true));
+
+               Comparator<Tuple2<Long, Long>> tuple2Comparator = new 
Tuple2Comparator(new LongComparator(true));
+
+               List<Tuple2<Long, Long>> collected = 
dataSource.partitionByRange(keyExtractor).mapPartition(minMaxSelector).collect();
+               Collections.sort(collected, tuple2Comparator);
+
+               long previousMax = -1;
+               for (Tuple2<Long, Long> tuple2 : collected) {
+                       if (previousMax == -1) {
+                               previousMax = tuple2.f1;
+                       } else {
+                               long currentMin = tuple2.f0;
+                               assertTrue(tuple2.f0 < tuple2.f1);
+                               assertEquals(previousMax + 1, currentMin);
+                               previousMax = tuple2.f1;
+                       }
+               }
+       }
+
+       @Test(expected = InvalidProgramException.class)
+       public void testRangePartitionInIteration() throws Exception {
+
+               // does not apply for collection execution
+               if (super.mode == TestExecutionMode.COLLECTION) {
+                       throw new InvalidProgramException("Does not apply for 
collection execution");
+               }
+
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               DataSource<Long> source = env.generateSequence(0, 10000);
+
+               DataSet<Tuple2<Long, String>> tuples = source.map(new 
MapFunction<Long, Tuple2<Long, String>>() {
+                       @Override
+                       public Tuple2<Long, String> map(Long v) throws 
Exception {
+                               return new Tuple2<>(v, Long.toString(v));
+                       }
+               });
+
+               DeltaIteration<Tuple2<Long, String>, Tuple2<Long, String>> it = 
tuples.iterateDelta(tuples, 10, 0);
+               DataSet<Tuple2<Long, String>> body = it.getWorkset()
+                       .partitionByRange(1) // Verify that range partition is 
not allowed in iteration
+                       .join(it.getSolutionSet())
+                       .where(0).equalTo(0).projectFirst(0).projectSecond(1);
+               DataSet<Tuple2<Long, String>> result = it.closeWith(body, body);
+
+               result.collect(); // should fail
+       }
+
+       @Test
+       public void testRangePartitionerOnSequenceDataWithOrders() throws 
Exception {
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               DataSet<Tuple2<Long, Long>> dataSet = env.generateSequence(0, 
10000)
+                               .map(new MapFunction<Long, Tuple2<Long, 
Long>>() {
+                       @Override
+                       public Tuple2<Long, Long> map(Long value) throws 
Exception {
+                               return new Tuple2<>(value / 5000, value % 5000);
+                       }
+               });
+
+               final Tuple2Comparator<Long> tuple2Comparator = new 
Tuple2Comparator<>(new LongComparator(true),
+                               new LongComparator(false));
+
+               MinMaxSelector<Tuple2<Long, Long>> minMaxSelector = new 
MinMaxSelector<>(tuple2Comparator);
+
+               final List<Tuple2<Tuple2<Long, Long>, Tuple2<Long, Long>>> 
collected = dataSet.partitionByRange(0, 1)
+                               .withOrders(Order.ASCENDING, Order.DESCENDING)
+                               .mapPartition(minMaxSelector)
+                               .collect();
+
+               Collections.sort(collected, new 
Tuple2Comparator<>(tuple2Comparator));
+
+               Tuple2<Long, Long> previousMax = null;
+               for (Tuple2<Tuple2<Long, Long>, Tuple2<Long, Long>> tuple2 : 
collected) {
+                       assertTrue("Min element in each partition should be 
smaller than max.",
+                                       tuple2Comparator.compare(tuple2.f0, 
tuple2.f1) <= 0);
+                       if (previousMax == null) {
+                               previousMax = tuple2.f1;
+                       } else {
+                               assertTrue("Partitions overlap. Previous max 
should be smaller than current min.",
+                                               
tuple2Comparator.compare(previousMax, tuple2.f0) < 0);
+                               if (previousMax.f0.equals(tuple2.f0.f0)) {
+                                       //check that ordering on the second key 
is correct
+                                       assertEquals("Ordering on the second 
field should be continous.",
+                                                       previousMax.f1 - 1, 
tuple2.f0.f1.longValue());
+                               }
+                               previousMax = tuple2.f1;
+                       }
+               }
+       }
+
+       @Test
+       public void testRangePartitionerOnSequenceNestedDataWithOrders() throws 
Exception {
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               final DataSet<Tuple2<Tuple2<Long, Long>, Long>> dataSet = 
env.generateSequence(0, 10000)
+                               .map(new MapFunction<Long, Tuple2<Tuple2<Long, 
Long>, Long>>() {
+                                       @Override
+                                       public Tuple2<Tuple2<Long, Long>, Long> 
map(Long value) throws Exception {
+                                               return new Tuple2<>(new 
Tuple2<>(value / 5000, value % 5000), value);
+                                       }
+                               });
+
+               final Tuple2Comparator<Long> tuple2Comparator = new 
Tuple2Comparator<>(new LongComparator(true),
+                               new LongComparator(true));
+               MinMaxSelector<Tuple2<Long, Long>> minMaxSelector = new 
MinMaxSelector<>(tuple2Comparator);
+
+               final List<Tuple2<Tuple2<Long, Long>, Tuple2<Long, Long>>> 
collected = dataSet.partitionByRange(0)
+                               .withOrders(Order.ASCENDING)
+                               .mapPartition(new 
MapPartitionFunction<Tuple2<Tuple2<Long, Long>, Long>, Tuple2<Long, Long>>() {
+                                       @Override
+                                       public void 
mapPartition(Iterable<Tuple2<Tuple2<Long, Long>, Long>> values,
+                                                       Collector<Tuple2<Long, 
Long>> out) throws Exception {
+                                               for (Tuple2<Tuple2<Long, Long>, 
Long> value : values) {
+                                                       out.collect(value.f0);
+                                               }
+                                       }
+                               })
+                               .mapPartition(minMaxSelector)
+                               .collect();
+
+               Collections.sort(collected, new 
Tuple2Comparator<>(tuple2Comparator));
+
+               Tuple2<Long, Long> previousMax = null;
+               for (Tuple2<Tuple2<Long, Long>, Tuple2<Long, Long>> tuple2 : 
collected) {
+                       assertTrue("Min element in each partition should be 
smaller than max.",
+                                       tuple2Comparator.compare(tuple2.f0, 
tuple2.f1) <= 0);
+                       if (previousMax == null) {
+                               previousMax = tuple2.f1;
+                       } else {
+                               assertTrue("Partitions overlap. Previous max 
should be smaller than current min.",
+                                               
tuple2Comparator.compare(previousMax, tuple2.f0) < 0);
+                               if (previousMax.f0.equals(tuple2.f0.f0)) {
+                                       assertEquals("Ordering on the second 
field should be continous.",
+                                                       previousMax.f1 + 1, 
tuple2.f0.f1.longValue());
+                               }
+                               previousMax = tuple2.f1;
+                       }
+               }
+       }
+
+       @Test
+       public void 
testRangePartitionerWithKeySelectorOnSequenceNestedDataWithOrders() throws 
Exception {
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               final DataSet<Tuple2<ComparablePojo, Long>> dataSet = 
env.generateSequence(0, 10000)
+                               .map(new MapFunction<Long, 
Tuple2<ComparablePojo, Long>>() {
+                                       @Override
+                                       public Tuple2<ComparablePojo, Long> 
map(Long value) throws Exception {
+                                               return new Tuple2<>(new 
ComparablePojo(value / 5000, value % 5000), value);
+                                       }
+                               });
+
+               final List<Tuple2<ComparablePojo, ComparablePojo>> collected = 
dataSet
+                               .partitionByRange(new 
KeySelector<Tuple2<ComparablePojo, Long>, ComparablePojo>() {
+                                       @Override
+                                       public ComparablePojo 
getKey(Tuple2<ComparablePojo, Long> value) throws Exception {
+                                               return value.f0;
+                                       }
+                               })
+                               .withOrders(Order.ASCENDING)
+                               .mapPartition(new MinMaxSelector<>(new 
ComparablePojoComparator()))
+                               .mapPartition(new ExtractComparablePojo())
+                               .collect();
+
+               final Comparator<Tuple2<ComparablePojo, ComparablePojo>> 
pojoComparator =
+                               new Comparator<Tuple2<ComparablePojo, 
ComparablePojo>>() {
+                       @Override
+                       public int compare(Tuple2<ComparablePojo, 
ComparablePojo> o1, Tuple2<ComparablePojo, ComparablePojo> o2) {
+                               return o1.f0.compareTo(o2.f1);
+                       }
+               };
+               Collections.sort(collected, pojoComparator);
+
+               ComparablePojo previousMax = null;
+               for (Tuple2<ComparablePojo, ComparablePojo> element : 
collected) {
+                       assertTrue("Min element in each partition should be 
smaller than max.",
+                                       element.f0.compareTo(element.f1) <= 0);
+                       if (previousMax == null) {
+                               previousMax = element.f1;
+                       } else {
+                               assertTrue("Partitions overlap. Previous max 
should be smaller than current min.",
+                                               
previousMax.compareTo(element.f0) < 0);
+                               if (previousMax.first.equals(element.f0.first)) 
{
+                                       assertEquals("Ordering on the second 
field should be continous.",
+                                                       previousMax.second - 1, 
element.f0.second.longValue());
+                               }
+                               previousMax = element.f1;
+                       }
+               }
+       }
+
+       private static class ExtractComparablePojo implements 
MapPartitionFunction<
+                       Tuple2<Tuple2<ComparablePojo, Long>, 
Tuple2<ComparablePojo, Long>>,
+                       Tuple2<ComparablePojo, ComparablePojo>> {
+
+               @Override
+               public void mapPartition(Iterable<Tuple2<Tuple2<ComparablePojo, 
Long>, Tuple2<ComparablePojo, Long>>> values,
+                               Collector<Tuple2<ComparablePojo, 
ComparablePojo>> out) throws Exception {
+                       for (Tuple2<Tuple2<ComparablePojo, Long>, 
Tuple2<ComparablePojo, Long>> value : values) {
+                               out.collect(new Tuple2<>(value.f0.f0, 
value.f1.f0));
+                       }
+               }
+       }
+
+       private static class ComparablePojoComparator implements 
Comparator<Tuple2<ComparablePojo, Long>>, Serializable {
+
+               @Override
+               public int compare(Tuple2<ComparablePojo, Long> o1, 
Tuple2<ComparablePojo, Long> o2) {
+                       return o1.f0.compareTo(o2.f0);
+               }
+       }
+
+       private static class ComparablePojo implements 
Comparable<ComparablePojo> {
+               private Long first;
+               private Long second;
+
+               public Long getFirst() {
+                       return first;
+               }
+
+               public void setFirst(Long first) {
+                       this.first = first;
+               }
+
+               public Long getSecond() {
+                       return second;
+               }
+
+               public void setSecond(Long second) {
+                       this.second = second;
+               }
+
+               public ComparablePojo(Long first, Long second) {
+                       this.first = first;
+                       this.second = second;
+               }
+
+               public ComparablePojo() {
+               }
+
+               @Override
+               public int compareTo(ComparablePojo o) {
+                       final int firstResult = Long.compare(this.first, 
o.first);
+                       if (firstResult == 0) {
+                               return (-1) * Long.compare(this.second, 
o.second);
+                       }
+
+                       return firstResult;
+               }
+       }
+
+       private static class ObjectSelfKeySelector implements KeySelector<Long, 
Long> {
+               @Override
+               public Long getKey(Long value) throws Exception {
+                       return value;
+               }
+       }
+
+       private static class MinMaxSelector<T> implements 
MapPartitionFunction<T, Tuple2<T, T>> {
+
+               private final Comparator<T> comparator;
+
+               public MinMaxSelector(Comparator<T> comparator) {
+                       this.comparator = comparator;
+               }
+
+               @Override
+               public void mapPartition(Iterable<T> values, 
Collector<Tuple2<T, T>> out) throws Exception {
+                       Iterator<T> itr = values.iterator();
+                       T min = itr.next();
+                       T max = min;
+                       T value;
+                       while (itr.hasNext()) {
+                               value = itr.next();
+                               if (comparator.compare(value, min) < 0) {
+                                       min = value;
+                               }
+                               if (comparator.compare(value, max) > 0) {
+                                       max = value;
+                               }
+
+                       }
+
+                       Tuple2<T, T> result = new Tuple2<>(min, max);
+                       out.collect(result);
+               }
+       }
+
+       private static class Tuple2Comparator<T> implements 
Comparator<Tuple2<T, T>>, Serializable {
+
+               private final Comparator<T> firstComparator;
+               private final Comparator<T> secondComparator;
+
+               public Tuple2Comparator(Comparator<T> comparator) {
+                       this(comparator, comparator);
+               }
+
+               public Tuple2Comparator(Comparator<T> firstComparator,
+                                                               Comparator<T> 
secondComparator) {
+                       this.firstComparator = firstComparator;
+                       this.secondComparator = secondComparator;
+               }
+
+               @Override
+               public int compare(Tuple2<T, T> first, Tuple2<T, T> second) {
+                       long result = firstComparator.compare(first.f0, 
second.f0);
+                       if (result > 0) {
+                               return 1;
+                       } else if (result < 0) {
+                               return -1;
+                       }
+
+                       result = secondComparator.compare(first.f1, second.f1);
+                       if (result > 0) {
+                               return 1;
+                       } else if (result < 0) {
+                               return -1;
+                       }
+
+                       return 0;
+               }
+       }
+
+       private static class LongComparator implements Comparator<Long>, 
Serializable {
+
+               private final boolean ascending;
+
+               public LongComparator(boolean ascending) {
+                       this.ascending = ascending;
+               }
+
+               @Override
+               public int compare(Long o1, Long o2) {
+                       if (ascending) {
+                               return Long.compare(o1, o2);
+                       } else {
+                               return (-1) * Long.compare(o1, o2);
+                       }
+               }
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/operators/ProjectITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/operators/ProjectITCase.java 
b/flink-tests/src/test/java/org/apache/flink/test/operators/ProjectITCase.java
new file mode 100644
index 0000000..6f52e58
--- /dev/null
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/operators/ProjectITCase.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.operators;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.tuple.Tuple5;
+import org.apache.flink.test.operators.util.CollectionDataSets;
+import org.apache.flink.test.util.JavaProgramTestBase;
+
+import java.util.List;
+
+/**
+ * Integration tests for {@link DataSet#project}.
+ */
+public class ProjectITCase extends JavaProgramTestBase {
+
+       @Override
+       protected void testProgram() throws Exception {
+               /*
+                * Projection with tuple fields indexes
+                */
+
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds = 
CollectionDataSets.get5TupleDataSet(env);
+               DataSet<Tuple3<String, Long, Integer>> projDs = ds.
+                               project(3, 4, 2);
+               List<Tuple3<String, Long, Integer>> result = projDs.collect();
+
+               String expectedResult = "Hallo,1,0\n" +
+                               "Hallo Welt,2,1\n" +
+                               "Hallo Welt wie,1,2\n" +
+                               "Hallo Welt wie gehts?,2,3\n" +
+                               "ABC,2,4\n" +
+                               "BCD,3,5\n" +
+                               "CDE,2,6\n" +
+                               "DEF,1,7\n" +
+                               "EFG,1,8\n" +
+                               "FGH,2,9\n" +
+                               "GHI,1,10\n" +
+                               "HIJ,3,11\n" +
+                               "IJK,3,12\n" +
+                               "JKL,2,13\n" +
+                               "KLM,2,14\n";
+
+               compareResultAsTuples(result, expectedResult);
+       }
+
+}

Reply via email to