http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/util/CollectionDataSets.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/util/CollectionDataSets.java
 
b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/util/CollectionDataSets.java
deleted file mode 100644
index ba48e12..0000000
--- 
a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/util/CollectionDataSets.java
+++ /dev/null
@@ -1,725 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.javaApiOperators.util;
-
-import java.io.File;
-import java.io.Serializable;
-import java.math.BigDecimal;
-import java.math.BigInteger;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.Date;
-import java.util.GregorianCalendar;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.api.java.tuple.Tuple5;
-import org.apache.flink.api.java.tuple.Tuple7;
-import org.apache.flink.api.java.typeutils.TupleTypeInfo;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.hadoop.io.IntWritable;
-
-import scala.math.BigInt;
-
-/**
- * 
#######################################################################################################
- * 
- *                     BE AWARE THAT OTHER TESTS DEPEND ON THIS TEST DATA. 
- *                     IF YOU MODIFY THE DATA MAKE SURE YOU CHECK THAT ALL 
TESTS ARE STILL WORKING!
- * 
- * 
#######################################################################################################
- */
-public class CollectionDataSets {
-
-       public static DataSet<Tuple3<Integer, Long, String>> 
get3TupleDataSet(ExecutionEnvironment env) {
-
-               List<Tuple3<Integer, Long, String>> data = new ArrayList<>();
-               data.add(new Tuple3<>(1, 1L, "Hi"));
-               data.add(new Tuple3<>(2, 2L, "Hello"));
-               data.add(new Tuple3<>(3, 2L, "Hello world"));
-               data.add(new Tuple3<>(4, 3L, "Hello world, how are you?"));
-               data.add(new Tuple3<>(5, 3L, "I am fine."));
-               data.add(new Tuple3<>(6, 3L, "Luke Skywalker"));
-               data.add(new Tuple3<>(7, 4L, "Comment#1"));
-               data.add(new Tuple3<>(8, 4L, "Comment#2"));
-               data.add(new Tuple3<>(9, 4L, "Comment#3"));
-               data.add(new Tuple3<>(10, 4L, "Comment#4"));
-               data.add(new Tuple3<>(11, 5L, "Comment#5"));
-               data.add(new Tuple3<>(12, 5L, "Comment#6"));
-               data.add(new Tuple3<>(13, 5L, "Comment#7"));
-               data.add(new Tuple3<>(14, 5L, "Comment#8"));
-               data.add(new Tuple3<>(15, 5L, "Comment#9"));
-               data.add(new Tuple3<>(16, 6L, "Comment#10"));
-               data.add(new Tuple3<>(17, 6L, "Comment#11"));
-               data.add(new Tuple3<>(18, 6L, "Comment#12"));
-               data.add(new Tuple3<>(19, 6L, "Comment#13"));
-               data.add(new Tuple3<>(20, 6L, "Comment#14"));
-               data.add(new Tuple3<>(21, 6L, "Comment#15"));
-
-               Collections.shuffle(data);
-
-               return env.fromCollection(data);
-       }
-
-       public static DataSet<Tuple3<Integer, Long, String>> 
getSmall3TupleDataSet(ExecutionEnvironment env) {
-
-               List<Tuple3<Integer, Long, String>> data = new ArrayList<>();
-               data.add(new Tuple3<>(1, 1L, "Hi"));
-               data.add(new Tuple3<>(2, 2L, "Hello"));
-               data.add(new Tuple3<>(3, 2L, "Hello world"));
-
-               Collections.shuffle(data);
-
-               return env.fromCollection(data);
-       }
-
-       public static DataSet<Tuple5<Integer, Long, Integer, String, Long>> 
get5TupleDataSet(ExecutionEnvironment env) {
-
-               List<Tuple5<Integer, Long, Integer, String, Long>> data = new 
ArrayList<>();
-               data.add(new Tuple5<>(1, 1L, 0, "Hallo", 1L));
-               data.add(new Tuple5<>(2, 2L, 1, "Hallo Welt", 2L));
-               data.add(new Tuple5<>(2, 3L, 2, "Hallo Welt wie", 1L));
-               data.add(new Tuple5<>(3, 4L, 3, "Hallo Welt wie gehts?", 2L));
-               data.add(new Tuple5<>(3, 5L, 4, "ABC", 2L));
-               data.add(new Tuple5<>(3, 6L, 5, "BCD", 3L));
-               data.add(new Tuple5<>(4, 7L, 6, "CDE", 2L));
-               data.add(new Tuple5<>(4, 8L, 7, "DEF", 1L));
-               data.add(new Tuple5<>(4, 9L, 8, "EFG", 1L));
-               data.add(new Tuple5<>(4, 10L, 9, "FGH", 2L));
-               data.add(new Tuple5<>(5, 11L, 10, "GHI", 1L));
-               data.add(new Tuple5<>(5, 12L, 11, "HIJ", 3L));
-               data.add(new Tuple5<>(5, 13L, 12, "IJK", 3L));
-               data.add(new Tuple5<>(5, 14L, 13, "JKL", 2L));
-               data.add(new Tuple5<>(5, 15L, 14, "KLM", 2L));
-
-               Collections.shuffle(data);
-
-               TupleTypeInfo<Tuple5<Integer, Long, Integer, String, Long>> 
type = new TupleTypeInfo<>(
-                               BasicTypeInfo.INT_TYPE_INFO,
-                               BasicTypeInfo.LONG_TYPE_INFO,
-                               BasicTypeInfo.INT_TYPE_INFO,
-                               BasicTypeInfo.STRING_TYPE_INFO,
-                               BasicTypeInfo.LONG_TYPE_INFO
-               );
-
-               return env.fromCollection(data, type);
-       }
-
-       public static DataSet<Tuple5<Integer, Long, Integer, String, Long>> 
getSmall5TupleDataSet(ExecutionEnvironment env) {
-
-               List<Tuple5<Integer, Long, Integer, String, Long>> data = new 
ArrayList<>();
-               data.add(new Tuple5<>(1, 1L, 0, "Hallo", 1L));
-               data.add(new Tuple5<>(2, 2L, 1, "Hallo Welt", 2L));
-               data.add(new Tuple5<>(2, 3L, 2, "Hallo Welt wie", 1L));
-
-               Collections.shuffle(data);
-
-               TupleTypeInfo<Tuple5<Integer, Long, Integer, String, Long>> 
type = new TupleTypeInfo<>(
-                               BasicTypeInfo.INT_TYPE_INFO,
-                               BasicTypeInfo.LONG_TYPE_INFO,
-                               BasicTypeInfo.INT_TYPE_INFO,
-                               BasicTypeInfo.STRING_TYPE_INFO,
-                               BasicTypeInfo.LONG_TYPE_INFO
-               );
-
-               return env.fromCollection(data, type);
-       }
-
-       public static DataSet<Tuple2<Tuple2<Integer, Integer>, String>> 
getSmallNestedTupleDataSet(ExecutionEnvironment env) {
-
-               List<Tuple2<Tuple2<Integer, Integer>, String>> data = new 
ArrayList<>();
-               data.add(new Tuple2<>(new Tuple2<>(1, 1), "one"));
-               data.add(new Tuple2<>(new Tuple2<>(2, 2), "two"));
-               data.add(new Tuple2<>(new Tuple2<>(3, 3), "three"));
-
-               TupleTypeInfo<Tuple2<Tuple2<Integer, Integer>, String>> type = 
new TupleTypeInfo<>(
-                               new TupleTypeInfo<Tuple2<Integer, 
Integer>>(BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO),
-                               BasicTypeInfo.STRING_TYPE_INFO
-               );
-
-               return env.fromCollection(data, type);
-       }
-
-       public static DataSet<Tuple2<Tuple2<Integer, Integer>, String>> 
getGroupSortedNestedTupleDataSet(ExecutionEnvironment env) {
-
-               List<Tuple2<Tuple2<Integer, Integer>, String>> data = new 
ArrayList<>();
-               data.add(new Tuple2<>(new Tuple2<>(1, 3), "a"));
-               data.add(new Tuple2<>(new Tuple2<>(1, 2), "a"));
-               data.add(new Tuple2<>(new Tuple2<>(2, 1), "a"));
-               data.add(new Tuple2<>(new Tuple2<>(2, 2), "b"));
-               data.add(new Tuple2<>(new Tuple2<>(3, 3), "c"));
-               data.add(new Tuple2<>(new Tuple2<>(3, 6), "c"));
-               data.add(new Tuple2<>(new Tuple2<>(4, 9), "c"));
-
-               TupleTypeInfo<Tuple2<Tuple2<Integer, Integer>, String>> type = 
new TupleTypeInfo<>(
-                               new TupleTypeInfo<Tuple2<Integer, 
Integer>>(BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO),
-                               BasicTypeInfo.STRING_TYPE_INFO
-               );
-
-               return env.fromCollection(data, type);
-       }
-
-       public static DataSet<Tuple3<Tuple2<Integer, Integer>, String, 
Integer>> getGroupSortedNestedTupleDataSet2(ExecutionEnvironment env) {
-
-               List<Tuple3<Tuple2<Integer, Integer>, String, Integer>> data = 
new ArrayList<>();
-               data.add(new Tuple3<>(new Tuple2<>(1, 3), "a", 2));
-               data.add(new Tuple3<>(new Tuple2<>(1, 2), "a", 1));
-               data.add(new Tuple3<>(new Tuple2<>(2, 1), "a", 3));
-               data.add(new Tuple3<>(new Tuple2<>(2, 2), "b", 4));
-               data.add(new Tuple3<>(new Tuple2<>(3, 3), "c", 5));
-               data.add(new Tuple3<>(new Tuple2<>(3, 6), "c", 6));
-               data.add(new Tuple3<>(new Tuple2<>(4, 9), "c", 7));
-
-               TupleTypeInfo<Tuple3<Tuple2<Integer, Integer>, String, 
Integer>> type = new TupleTypeInfo<>(
-                               new TupleTypeInfo<Tuple2<Integer, 
Integer>>(BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO),
-                               BasicTypeInfo.STRING_TYPE_INFO,
-                               BasicTypeInfo.INT_TYPE_INFO
-               );
-
-               return env.fromCollection(data, type);
-       }
-       
-       public static DataSet<Tuple2<byte[], Integer>> 
getTuple2WithByteArrayDataSet(ExecutionEnvironment env) {
-               List<Tuple2<byte[], Integer>> data = new ArrayList<>();
-               data.add(new Tuple2<>(new byte[]{0, 4}, 1));
-               data.add(new Tuple2<>(new byte[]{2, 0}, 1));
-               data.add(new Tuple2<>(new byte[]{2, 0, 4}, 4));
-               data.add(new Tuple2<>(new byte[]{2, 1}, 3));
-               data.add(new Tuple2<>(new byte[]{0}, 0));
-               data.add(new Tuple2<>(new byte[]{2, 0}, 1));
-                               
-               TupleTypeInfo<Tuple2<byte[], Integer>> type = new 
TupleTypeInfo<>(
-                               
PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO,
-                               BasicTypeInfo.INT_TYPE_INFO
-               );
-               
-               return env.fromCollection(data, type);
-       }
-
-       public static DataSet<String> getStringDataSet(ExecutionEnvironment 
env) {
-
-               List<String> data = new ArrayList<>();
-               data.add("Hi");
-               data.add("Hello");
-               data.add("Hello world");
-               data.add("Hello world, how are you?");
-               data.add("I am fine.");
-               data.add("Luke Skywalker");
-               data.add("Random comment");
-               data.add("LOL");
-
-               Collections.shuffle(data);
-
-               return env.fromCollection(data);
-       }
-
-       public static DataSet<Integer> getIntegerDataSet(ExecutionEnvironment 
env) {
-
-               List<Integer> data = new ArrayList<>();
-               data.add(1);
-               data.add(2);
-               data.add(2);
-               data.add(3);
-               data.add(3);
-               data.add(3);
-               data.add(4);
-               data.add(4);
-               data.add(4);
-               data.add(4);
-               data.add(5);
-               data.add(5);
-               data.add(5);
-               data.add(5);
-               data.add(5);
-
-               Collections.shuffle(data);
-
-               return env.fromCollection(data);
-       }
-
-       public static DataSet<CustomType> 
getCustomTypeDataSet(ExecutionEnvironment env) {
-
-               List<CustomType> data = new ArrayList<>();
-               data.add(new CustomType(1, 0L, "Hi"));
-               data.add(new CustomType(2, 1L, "Hello"));
-               data.add(new CustomType(2, 2L, "Hello world"));
-               data.add(new CustomType(3, 3L, "Hello world, how are you?"));
-               data.add(new CustomType(3, 4L, "I am fine."));
-               data.add(new CustomType(3, 5L, "Luke Skywalker"));
-               data.add(new CustomType(4, 6L, "Comment#1"));
-               data.add(new CustomType(4, 7L, "Comment#2"));
-               data.add(new CustomType(4, 8L, "Comment#3"));
-               data.add(new CustomType(4, 9L, "Comment#4"));
-               data.add(new CustomType(5, 10L, "Comment#5"));
-               data.add(new CustomType(5, 11L, "Comment#6"));
-               data.add(new CustomType(5, 12L, "Comment#7"));
-               data.add(new CustomType(5, 13L, "Comment#8"));
-               data.add(new CustomType(5, 14L, "Comment#9"));
-               data.add(new CustomType(6, 15L, "Comment#10"));
-               data.add(new CustomType(6, 16L, "Comment#11"));
-               data.add(new CustomType(6, 17L, "Comment#12"));
-               data.add(new CustomType(6, 18L, "Comment#13"));
-               data.add(new CustomType(6, 19L, "Comment#14"));
-               data.add(new CustomType(6, 20L, "Comment#15"));
-
-               Collections.shuffle(data);
-
-               return env.fromCollection(data);
-
-       }
-
-       public static DataSet<CustomType> 
getSmallCustomTypeDataSet(ExecutionEnvironment env) {
-
-               List<CustomType> data = new ArrayList<>();
-               data.add(new CustomType(1, 0L, "Hi"));
-               data.add(new CustomType(2, 1L, "Hello"));
-               data.add(new CustomType(2, 2L, "Hello world"));
-
-               Collections.shuffle(data);
-
-               return env.fromCollection(data);
-
-       }
-
-       public static class CustomType implements Serializable {
-
-               private static final long serialVersionUID = 1L;
-
-               public int myInt;
-               public long myLong;
-               public String myString;
-
-               public CustomType() {
-               }
-
-               public CustomType(int i, long l, String s) {
-                       myInt = i;
-                       myLong = l;
-                       myString = s;
-               }
-
-               @Override
-               public String toString() {
-                       return myInt + "," + myLong + "," + myString;
-               }
-       }
-
-       public static class CustomTypeComparator implements 
Comparator<CustomType> {
-               @Override
-               public int compare(CustomType o1, CustomType o2) {
-                       int diff = o1.myInt - o2.myInt;
-                       if (diff != 0) {
-                               return diff;
-                       }
-                       diff = (int) (o1.myLong - o2.myLong);
-                       return diff != 0 ? diff : 
o1.myString.compareTo(o2.myString);
-               }
-
-       }
-
-       public static DataSet<Tuple7<Integer, String, Integer, Integer, Long, 
String, Long>> getSmallTuplebasedDataSet(ExecutionEnvironment env) {
-               List<Tuple7<Integer, String, Integer, Integer, Long, String, 
Long>> data = new ArrayList<>();
-               data.add(new Tuple7<>(1, "First", 10, 100, 1000L, "One", 
10000L));
-               data.add(new Tuple7<>(2, "Second", 20, 200, 2000L, "Two", 
20000L));
-               data.add(new Tuple7<>(3, "Third", 30, 300, 3000L, "Three", 
30000L));
-               return env.fromCollection(data);
-       }
-       
-       public static DataSet<Tuple7<Long, Integer, Integer, Long, String, 
Integer, String>> getSmallTuplebasedDataSetMatchingPojo(ExecutionEnvironment 
env) {
-               List<Tuple7<Long, Integer, Integer, Long, String, Integer, 
String>> data = new ArrayList<>();
-               data.add(new Tuple7<>(10000L, 10, 100, 1000L, "One", 1, 
"First"));
-               data.add(new Tuple7<>(20000L, 20, 200, 2000L, "Two", 2, 
"Second"));
-               data.add(new Tuple7<>(30000L, 30, 300, 3000L, "Three", 3, 
"Third"));
-               
-               return env.fromCollection(data);
-       }
-
-       public static DataSet<POJO> getSmallPojoDataSet(ExecutionEnvironment 
env) {
-               List<POJO> data = new ArrayList<>();
-               data.add(new POJO(1 /*number*/, "First" /*str*/, 10 /*f0*/, 
100/*f1.myInt*/, 1000L/*f1.myLong*/, "One" /*f1.myString*/, 10000L 
/*nestedPojo.longNumber*/));
-               data.add(new POJO(2, "Second", 20, 200, 2000L, "Two", 20000L));
-               data.add(new POJO(3, "Third", 30, 300, 3000L, "Three", 30000L));
-               return env.fromCollection(data);
-       }
-
-       public static DataSet<POJO> 
getDuplicatePojoDataSet(ExecutionEnvironment env) {
-               List<POJO> data = new ArrayList<>();
-               data.add(new POJO(1, "First", 10, 100, 1000L, "One", 10000L)); 
// 5x
-               data.add(new POJO(1, "First", 10, 100, 1000L, "One", 10000L));
-               data.add(new POJO(1, "First", 10, 100, 1000L, "One", 10000L));
-               data.add(new POJO(1, "First", 10, 100, 1000L, "One", 10000L));
-               data.add(new POJO(1, "First", 10, 100, 1000L, "One", 10000L));
-               data.add(new POJO(2, "Second", 20, 200, 2000L, "Two", 20000L));
-               data.add(new POJO(3, "Third", 30, 300, 3000L, "Three", 
30000L)); // 2x
-               data.add(new POJO(3, "Third", 30, 300, 3000L, "Three", 30000L));
-               return env.fromCollection(data);
-       }
-
-       public static DataSet<POJO> getMixedPojoDataSet(ExecutionEnvironment 
env) {
-               List<POJO> data = new ArrayList<>();
-               data.add(new POJO(1, "First", 10, 100, 1000L, "One", 10100L)); 
// 5x
-               data.add(new POJO(2, "First_", 10, 105, 1000L, "One", 10200L));
-               data.add(new POJO(3, "First", 11, 102, 3000L, "One", 10200L));
-               data.add(new POJO(4, "First_", 11, 106, 1000L, "One", 10300L));
-               data.add(new POJO(5, "First", 11, 102, 2000L, "One", 10100L));
-               data.add(new POJO(6, "Second_", 20, 200, 2000L, "Two", 10100L));
-               data.add(new POJO(7, "Third", 31, 301, 2000L, "Three", 
10200L)); // 2x
-               data.add(new POJO(8, "Third_", 30, 300, 1000L, "Three", 
10100L));
-               return env.fromCollection(data);
-       }
-
-       public static class POJO {
-               public int number;
-               public String str;
-               public Tuple2<Integer, CustomType> nestedTupleWithCustom;
-               public NestedPojo nestedPojo;
-               public transient Long ignoreMe;
-
-               public POJO(int i0, String s0,
-                                       int i1, int i2, long l0, String s1,
-                                       long l1) {
-                       this.number = i0;
-                       this.str = s0;
-                       this.nestedTupleWithCustom = new Tuple2<>(i1, new 
CustomType(i2, l0, s1));
-                       this.nestedPojo = new NestedPojo();
-                       this.nestedPojo.longNumber = l1;
-               }
-
-               public POJO() {
-               }
-
-               @Override
-               public String toString() {
-                       return number + " " + str + " " + nestedTupleWithCustom 
+ " " + nestedPojo.longNumber;
-               }
-       }
-
-       public static class NestedPojo {
-               public static Object ignoreMe;
-               public long longNumber;
-
-               public NestedPojo() {
-               }
-       }
-
-       public static DataSet<CrazyNested> 
getCrazyNestedDataSet(ExecutionEnvironment env) {
-               List<CrazyNested> data = new ArrayList<>();
-               data.add(new CrazyNested("aa"));
-               data.add(new CrazyNested("bb"));
-               data.add(new CrazyNested("bb"));
-               data.add(new CrazyNested("cc"));
-               data.add(new CrazyNested("cc"));
-               data.add(new CrazyNested("cc"));
-               return env.fromCollection(data);
-       }
-
-       public static class CrazyNested {
-               public CrazyNestedL1 nest_Lvl1;
-               public Long something; // test proper null-value handling
-
-               public CrazyNested() {
-               }
-
-               public CrazyNested(String set, String second, long s) { // 
additional CTor to set all fields to non-null values
-                       this(set);
-                       something = s;
-                       nest_Lvl1.a = second;
-               }
-
-               public CrazyNested(String set) {
-                       nest_Lvl1 = new CrazyNestedL1();
-                       nest_Lvl1.nest_Lvl2 = new CrazyNestedL2();
-                       nest_Lvl1.nest_Lvl2.nest_Lvl3 = new CrazyNestedL3();
-                       nest_Lvl1.nest_Lvl2.nest_Lvl3.nest_Lvl4 = new 
CrazyNestedL4();
-                       nest_Lvl1.nest_Lvl2.nest_Lvl3.nest_Lvl4.f1nal = set;
-               }
-       }
-
-       public static class CrazyNestedL1 {
-               public String a;
-               public int b;
-               public CrazyNestedL2 nest_Lvl2;
-       }
-
-       public static class CrazyNestedL2 {
-               public CrazyNestedL3 nest_Lvl3;
-       }
-
-       public static class CrazyNestedL3 {
-               public CrazyNestedL4 nest_Lvl4;
-       }
-
-       public static class CrazyNestedL4 {
-               public String f1nal;
-       }
-
-       // Copied from TypeExtractorTest
-       public static class FromTuple extends Tuple3<String, String, Long> {
-               private static final long serialVersionUID = 1L;
-               public int special;
-       }
-
-       public static class FromTupleWithCTor extends FromTuple {
-
-               private static final long serialVersionUID = 1L;
-
-               public FromTupleWithCTor() {}
-
-               public FromTupleWithCTor(int special, long tupleField) {
-                       this.special = special;
-                       this.setField(tupleField, 2);
-               }
-       }
-
-       public static DataSet<FromTupleWithCTor> 
getPojoExtendingFromTuple(ExecutionEnvironment env) {
-               List<FromTupleWithCTor> data = new ArrayList<>();
-               data.add(new FromTupleWithCTor(1, 10L)); // 3x
-               data.add(new FromTupleWithCTor(1, 10L));
-               data.add(new FromTupleWithCTor(1, 10L));
-               data.add(new FromTupleWithCTor(2, 20L)); // 2x
-               data.add(new FromTupleWithCTor(2, 20L));
-               return env.fromCollection(data);
-       }
-
-       public static class PojoContainingTupleAndWritable {
-               public int someInt;
-               public String someString;
-               public IntWritable hadoopFan;
-               public Tuple2<Long, Long> theTuple;
-
-               public PojoContainingTupleAndWritable() {
-               }
-
-               public PojoContainingTupleAndWritable(int i, long l1, long l2) {
-                       hadoopFan = new IntWritable(i);
-                       someInt = i;
-                       theTuple = new Tuple2<>(l1, l2);
-               }
-       }
-
-       public static DataSet<PojoContainingTupleAndWritable> 
getPojoContainingTupleAndWritable(ExecutionEnvironment env) {
-               List<PojoContainingTupleAndWritable> data = new ArrayList<>();
-               data.add(new PojoContainingTupleAndWritable(1, 10L, 100L)); // 
1x
-               data.add(new PojoContainingTupleAndWritable(2, 20L, 200L)); // 
5x
-               data.add(new PojoContainingTupleAndWritable(2, 20L, 200L));
-               data.add(new PojoContainingTupleAndWritable(2, 20L, 200L));
-               data.add(new PojoContainingTupleAndWritable(2, 20L, 200L));
-               data.add(new PojoContainingTupleAndWritable(2, 20L, 200L));
-               return env.fromCollection(data);
-       }
-
-
-
-       public static DataSet<PojoContainingTupleAndWritable> 
getGroupSortedPojoContainingTupleAndWritable(ExecutionEnvironment env) {
-               List<PojoContainingTupleAndWritable> data = new ArrayList<>();
-               data.add(new PojoContainingTupleAndWritable(1, 10L, 100L)); // 
1x
-               data.add(new PojoContainingTupleAndWritable(2, 20L, 200L)); // 
5x
-               data.add(new PojoContainingTupleAndWritable(2, 20L, 201L));
-               data.add(new PojoContainingTupleAndWritable(2, 30L, 200L));
-               data.add(new PojoContainingTupleAndWritable(2, 30L, 600L));
-               data.add(new PojoContainingTupleAndWritable(2, 30L, 400L));
-               return env.fromCollection(data);
-       }
-
-       public static DataSet<Tuple3<Integer, CrazyNested, POJO>> 
getTupleContainingPojos(ExecutionEnvironment env) {
-               List<Tuple3<Integer, CrazyNested, POJO>> data = new 
ArrayList<>();
-               data.add(new Tuple3<>(1, new CrazyNested("one", "uno", 1L), new 
POJO(1, "First", 10, 100, 1000L, "One", 10000L))); // 3x
-               data.add(new Tuple3<>(1, new CrazyNested("one", "uno", 1L), new 
POJO(1, "First", 10, 100, 1000L, "One", 10000L)));
-               data.add(new Tuple3<>(1, new CrazyNested("one", "uno", 1L), new 
POJO(1, "First", 10, 100, 1000L, "One", 10000L)));
-               // POJO is not initialized according to the first two fields.
-               data.add(new Tuple3<>(2, new CrazyNested("two", "duo", 2L), new 
POJO(1, "First", 10, 100, 1000L, "One", 10000L))); // 1x
-               return env.fromCollection(data);
-       }
-
-       public static class Pojo1 {
-               public String a;
-               public String b;
-
-               public Pojo1() {}
-
-               public Pojo1(String a, String b) {
-                       this.a = a;
-                       this.b = b;
-               }
-       }
-
-       public static class Pojo2 {
-               public String a2;
-               public String b2;
-       }
-
-       public static class PojoWithMultiplePojos {
-               public Pojo1 p1;
-               public Pojo2 p2;
-               public Integer i0;
-
-               public PojoWithMultiplePojos() {
-               }
-
-               public PojoWithMultiplePojos(String a, String b, String a1, 
String b1, Integer i0) {
-                       p1 = new Pojo1();
-                       p1.a = a;
-                       p1.b = b;
-                       p2 = new Pojo2();
-                       p2.a2 = a1;
-                       p2.b2 = b1;
-                       this.i0 = i0;
-               }
-       }
-
-       public static DataSet<PojoWithMultiplePojos> 
getPojoWithMultiplePojos(ExecutionEnvironment env) {
-               List<PojoWithMultiplePojos> data = new ArrayList<>();
-               data.add(new PojoWithMultiplePojos("a", "aa", "b", "bb", 1));
-               data.add(new PojoWithMultiplePojos("b", "bb", "c", "cc", 2));
-               data.add(new PojoWithMultiplePojos("b", "bb", "c", "cc", 2));
-               data.add(new PojoWithMultiplePojos("b", "bb", "c", "cc", 2));
-               data.add(new PojoWithMultiplePojos("d", "dd", "e", "ee", 3));
-               data.add(new PojoWithMultiplePojos("d", "dd", "e", "ee", 3));
-               return env.fromCollection(data);
-       }
-
-       public enum Category {
-               CAT_A, CAT_B
-       }
-
-       public static class PojoWithDateAndEnum {
-               public String group;
-               public Date date;
-               public Category cat;
-       }
-       
-       public static DataSet<PojoWithDateAndEnum> 
getPojoWithDateAndEnum(ExecutionEnvironment env) {
-               List<PojoWithDateAndEnum> data = new ArrayList<>();
-               
-               PojoWithDateAndEnum one = new PojoWithDateAndEnum();
-               one.group = "a"; one.date = new Date(666); one.cat = 
Category.CAT_A;
-               data.add(one);
-               
-               PojoWithDateAndEnum two = new PojoWithDateAndEnum();
-               two.group = "a"; two.date = new Date(666); two.cat = 
Category.CAT_A;
-               data.add(two);
-               
-               PojoWithDateAndEnum three = new PojoWithDateAndEnum();
-               three.group = "b"; three.date = new Date(666); three.cat = 
Category.CAT_B;
-               data.add(three);
-               
-               return env.fromCollection(data);
-       }
-
-       public static class PojoWithCollection {
-               public List<Pojo1> pojos;
-               public int key;
-               public java.sql.Date sqlDate;
-               public BigInteger bigInt;
-               public BigDecimal bigDecimalKeepItNull;
-               public BigInt scalaBigInt;
-               public List<Object> mixed;
-
-               @Override
-               public String toString() {
-                       return "PojoWithCollection{" +
-                                       "pojos.size()=" + pojos.size() +
-                                       ", key=" + key +
-                                       ", sqlDate=" + sqlDate +
-                                       ", bigInt=" + bigInt +
-                                       ", bigDecimalKeepItNull=" + 
bigDecimalKeepItNull +
-                                       ", scalaBigInt=" + scalaBigInt +
-                                       ", mixed=" + mixed +
-                                       '}';
-               }
-       }
-
-       public static class PojoWithCollectionGeneric {
-               public List<Pojo1> pojos;
-               public int key;
-               public java.sql.Date sqlDate;
-               public BigInteger bigInt;
-               public BigDecimal bigDecimalKeepItNull;
-               public BigInt scalaBigInt;
-               public List<Object> mixed;
-               private PojoWithDateAndEnum makeMeGeneric;
-
-               @Override
-               public String toString() {
-                       return "PojoWithCollection{" +
-                                       "pojos.size()=" + pojos.size() +
-                                       ", key=" + key +
-                                       ", sqlDate=" + sqlDate +
-                                       ", bigInt=" + bigInt +
-                                       ", bigDecimalKeepItNull=" + 
bigDecimalKeepItNull +
-                                       ", scalaBigInt=" + scalaBigInt +
-                                       ", mixed=" + mixed +
-                                       '}';
-               }
-       }
-
-       public static DataSet<PojoWithCollection> 
getPojoWithCollection(ExecutionEnvironment env) {
-               List<PojoWithCollection> data = new ArrayList<>();
-
-               List<Pojo1> pojosList1 = new ArrayList<>();
-               pojosList1.add(new Pojo1("a", "aa"));
-               pojosList1.add(new Pojo1("b", "bb"));
-
-               List<Pojo1> pojosList2 = new ArrayList<>();
-               pojosList2.add(new Pojo1("a2", "aa2"));
-               pojosList2.add(new Pojo1("b2", "bb2"));
-
-               PojoWithCollection pwc1 = new PojoWithCollection();
-               pwc1.pojos = pojosList1;
-               pwc1.key = 0;
-               pwc1.bigInt = 
BigInteger.valueOf(Long.MAX_VALUE).multiply(BigInteger.TEN);
-               pwc1.scalaBigInt = BigInt.int2bigInt(10);
-               pwc1.bigDecimalKeepItNull = null;
-               
-               // use calendar to make it stable across time zones
-               GregorianCalendar gcl1 = new GregorianCalendar(2033, 4, 18);
-               pwc1.sqlDate = new java.sql.Date(gcl1.getTimeInMillis());
-               pwc1.mixed = new ArrayList<>();
-               Map<String, Integer> map = new HashMap<>();
-               map.put("someKey", 1); // map.put("anotherKey", 2); 
map.put("third", 3);
-               pwc1.mixed.add(map);
-               pwc1.mixed.add(new File("/this/is/wrong"));
-               pwc1.mixed.add("uhlala");
-
-               PojoWithCollection pwc2 = new PojoWithCollection();
-               pwc2.pojos = pojosList2;
-               pwc2.key = 0;
-               pwc2.bigInt = 
BigInteger.valueOf(Long.MAX_VALUE).multiply(BigInteger.TEN);
-               pwc2.scalaBigInt = BigInt.int2bigInt(31104000);
-               pwc2.bigDecimalKeepItNull = null;
-               
-               GregorianCalendar gcl2 = new GregorianCalendar(1976, 4, 3);
-               pwc2.sqlDate = new java.sql.Date(gcl2.getTimeInMillis()); // 
1976
-
-
-               data.add(pwc1);
-               data.add(pwc2);
-
-               return env.fromCollection(data);
-       }
-
-}
-

http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/util/ValueCollectionDataSets.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/util/ValueCollectionDataSets.java
 
b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/util/ValueCollectionDataSets.java
deleted file mode 100644
index 04a7bc5..0000000
--- 
a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/util/ValueCollectionDataSets.java
+++ /dev/null
@@ -1,730 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.javaApiOperators.util;
-
-import java.io.File;
-import java.io.Serializable;
-import java.math.BigDecimal;
-import java.math.BigInteger;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.Date;
-import java.util.GregorianCalendar;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.api.java.tuple.Tuple5;
-import org.apache.flink.api.java.tuple.Tuple7;
-import org.apache.flink.api.java.typeutils.TupleTypeInfo;
-import org.apache.flink.api.java.typeutils.ValueTypeInfo;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.types.IntValue;
-import org.apache.flink.types.LongValue;
-import org.apache.flink.types.StringValue;
-import org.apache.hadoop.io.IntWritable;
-
-import scala.math.BigInt;
-
-/**
- * 
#######################################################################################################
- * 
- *                     BE AWARE THAT OTHER TESTS DEPEND ON THIS TEST DATA. 
- *                     IF YOU MODIFY THE DATA MAKE SURE YOU CHECK THAT ALL 
TESTS ARE STILL WORKING!
- * 
- * 
#######################################################################################################
- */
-public class ValueCollectionDataSets {
-
-       public static DataSet<Tuple3<IntValue, LongValue, StringValue>> 
get3TupleDataSet(ExecutionEnvironment env) {
-               List<Tuple3<IntValue, LongValue, StringValue>> data = new 
ArrayList<>();
-
-               data.add(new Tuple3<>(new IntValue(1), new LongValue(1l), new 
StringValue("Hi")));
-               data.add(new Tuple3<>(new IntValue(2), new LongValue(2l), new 
StringValue("Hello")));
-               data.add(new Tuple3<>(new IntValue(3), new LongValue(2l), new 
StringValue("Hello world")));
-               data.add(new Tuple3<>(new IntValue(4), new LongValue(3l), new 
StringValue("Hello world, how are you?")));
-               data.add(new Tuple3<>(new IntValue(5), new LongValue(3l), new 
StringValue("I am fine.")));
-               data.add(new Tuple3<>(new IntValue(6), new LongValue(3l), new 
StringValue("Luke Skywalker")));
-               data.add(new Tuple3<>(new IntValue(7), new LongValue(4l), new 
StringValue("Comment#1")));
-               data.add(new Tuple3<>(new IntValue(8), new LongValue(4l), new 
StringValue("Comment#2")));
-               data.add(new Tuple3<>(new IntValue(9), new LongValue(4l), new 
StringValue("Comment#3")));
-               data.add(new Tuple3<>(new IntValue(10), new LongValue(4l), new 
StringValue("Comment#4")));
-               data.add(new Tuple3<>(new IntValue(11), new LongValue(5l), new 
StringValue("Comment#5")));
-               data.add(new Tuple3<>(new IntValue(12), new LongValue(5l), new 
StringValue("Comment#6")));
-               data.add(new Tuple3<>(new IntValue(13), new LongValue(5l), new 
StringValue("Comment#7")));
-               data.add(new Tuple3<>(new IntValue(14), new LongValue(5l), new 
StringValue("Comment#8")));
-               data.add(new Tuple3<>(new IntValue(15), new LongValue(5l), new 
StringValue("Comment#9")));
-               data.add(new Tuple3<>(new IntValue(16), new LongValue(6l), new 
StringValue("Comment#10")));
-               data.add(new Tuple3<>(new IntValue(17), new LongValue(6l), new 
StringValue("Comment#11")));
-               data.add(new Tuple3<>(new IntValue(18), new LongValue(6l), new 
StringValue("Comment#12")));
-               data.add(new Tuple3<>(new IntValue(19), new LongValue(6l), new 
StringValue("Comment#13")));
-               data.add(new Tuple3<>(new IntValue(20), new LongValue(6l), new 
StringValue("Comment#14")));
-               data.add(new Tuple3<>(new IntValue(21), new LongValue(6l), new 
StringValue("Comment#15")));
-
-               Collections.shuffle(data);
-
-               return env.fromCollection(data);
-       }
-
-       public static DataSet<Tuple3<IntValue, LongValue, StringValue>> 
getSmall3TupleDataSet(ExecutionEnvironment env) {
-               List<Tuple3<IntValue, LongValue, StringValue>> data = new 
ArrayList<>();
-
-               data.add(new Tuple3<>(new IntValue(1), new LongValue(1l), new 
StringValue("Hi")));
-               data.add(new Tuple3<>(new IntValue(2), new LongValue(2l), new 
StringValue("Hello")));
-               data.add(new Tuple3<>(new IntValue(3), new LongValue(2l), new 
StringValue("Hello world")));
-
-               Collections.shuffle(data);
-
-               return env.fromCollection(data);
-       }
-
-       public static DataSet<Tuple5<IntValue, LongValue, IntValue, 
StringValue, LongValue>> get5TupleDataSet(ExecutionEnvironment env) {
-               List<Tuple5<IntValue, LongValue, IntValue, StringValue, 
LongValue>> data = new ArrayList<>();
-
-               data.add(new Tuple5<>(new IntValue(1), new LongValue(1l), new 
IntValue(0), new StringValue("Hallo"), new LongValue(1l)));
-               data.add(new Tuple5<>(new IntValue(2), new LongValue(2l), new 
IntValue(1), new StringValue("Hallo Welt"), new LongValue(2l)));
-               data.add(new Tuple5<>(new IntValue(2), new LongValue(3l), new 
IntValue(2), new StringValue("Hallo Welt wie"), new LongValue(1l)));
-               data.add(new Tuple5<>(new IntValue(3), new LongValue(4l), new 
IntValue(3), new StringValue("Hallo Welt wie gehts?"), new LongValue(2l)));
-               data.add(new Tuple5<>(new IntValue(3), new LongValue(5l), new 
IntValue(4), new StringValue("ABC"), new LongValue(2l)));
-               data.add(new Tuple5<>(new IntValue(3), new LongValue(6l), new 
IntValue(5), new StringValue("BCD"), new LongValue(3l)));
-               data.add(new Tuple5<>(new IntValue(4), new LongValue(7l), new 
IntValue(6), new StringValue("CDE"), new LongValue(2l)));
-               data.add(new Tuple5<>(new IntValue(4), new LongValue(8l), new 
IntValue(7), new StringValue("DEF"), new LongValue(1l)));
-               data.add(new Tuple5<>(new IntValue(4), new LongValue(9l), new 
IntValue(8), new StringValue("EFG"), new LongValue(1l)));
-               data.add(new Tuple5<>(new IntValue(4), new LongValue(10l), new 
IntValue(9), new StringValue("FGH"), new LongValue(2l)));
-               data.add(new Tuple5<>(new IntValue(5), new LongValue(11l), new 
IntValue(10), new StringValue("GHI"), new LongValue(1l)));
-               data.add(new Tuple5<>(new IntValue(5), new LongValue(12l), new 
IntValue(11), new StringValue("HIJ"), new LongValue(3l)));
-               data.add(new Tuple5<>(new IntValue(5), new LongValue(13l), new 
IntValue(12), new StringValue("IJK"), new LongValue(3l)));
-               data.add(new Tuple5<>(new IntValue(5), new LongValue(14l), new 
IntValue(13), new StringValue("JKL"), new LongValue(2l)));
-               data.add(new Tuple5<>(new IntValue(5), new LongValue(15l), new 
IntValue(14), new StringValue("KLM"), new LongValue(2l)));
-
-               Collections.shuffle(data);
-
-               TupleTypeInfo<Tuple5<IntValue, LongValue, IntValue, 
StringValue, LongValue>> type = new
-                       TupleTypeInfo<>(
-                               ValueTypeInfo.INT_VALUE_TYPE_INFO,
-                               ValueTypeInfo.LONG_VALUE_TYPE_INFO,
-                               ValueTypeInfo.INT_VALUE_TYPE_INFO,
-                               ValueTypeInfo.STRING_VALUE_TYPE_INFO,
-                               ValueTypeInfo.LONG_VALUE_TYPE_INFO
-               );
-
-               return env.fromCollection(data, type);
-       }
-
-       public static DataSet<Tuple5<IntValue, LongValue, IntValue, 
StringValue, LongValue>> getSmall5TupleDataSet(ExecutionEnvironment env) {
-               List<Tuple5<IntValue, LongValue, IntValue, StringValue, 
LongValue>> data = new ArrayList<>();
-
-               data.add(new Tuple5<>(new IntValue(1), new LongValue(1l), new 
IntValue(0), new StringValue("Hallo"), new LongValue(1l)));
-               data.add(new Tuple5<>(new IntValue(2), new LongValue(2l), new 
IntValue(1), new StringValue("Hallo Welt"), new LongValue(2l)));
-               data.add(new Tuple5<>(new IntValue(2), new LongValue(3l), new 
IntValue(2), new StringValue("Hallo Welt wie"), new LongValue(1l)));
-
-               Collections.shuffle(data);
-
-               TupleTypeInfo<Tuple5<IntValue, LongValue, IntValue, 
StringValue, LongValue>> type = new
-                       TupleTypeInfo<>(
-                               ValueTypeInfo.INT_VALUE_TYPE_INFO,
-                               ValueTypeInfo.LONG_VALUE_TYPE_INFO,
-                               ValueTypeInfo.INT_VALUE_TYPE_INFO,
-                               ValueTypeInfo.STRING_VALUE_TYPE_INFO,
-                               ValueTypeInfo.LONG_VALUE_TYPE_INFO
-               );
-
-               return env.fromCollection(data, type);
-       }
-
-       public static DataSet<Tuple2<Tuple2<IntValue, IntValue>, StringValue>> 
getSmallNestedTupleDataSet(ExecutionEnvironment env) {
-               List<Tuple2<Tuple2<IntValue, IntValue>, StringValue>> data = 
new ArrayList<>();
-
-               data.add(new Tuple2<>(new Tuple2<>(new IntValue(1), new 
IntValue(1)), new StringValue("one")));
-               data.add(new Tuple2<>(new Tuple2<>(new IntValue(2), new 
IntValue(2)), new StringValue("two")));
-               data.add(new Tuple2<>(new Tuple2<>(new IntValue(3), new 
IntValue(3)), new StringValue("three")));
-
-               TupleTypeInfo<Tuple2<Tuple2<IntValue, IntValue>, StringValue>> 
type = new
-                       TupleTypeInfo<>(
-                               new TupleTypeInfo<Tuple2<IntValue, 
IntValue>>(ValueTypeInfo.INT_VALUE_TYPE_INFO, 
ValueTypeInfo.INT_VALUE_TYPE_INFO),
-                               ValueTypeInfo.STRING_VALUE_TYPE_INFO
-               );
-
-               return env.fromCollection(data, type);
-       }
-
-       public static DataSet<Tuple2<Tuple2<IntValue, IntValue>, StringValue>> 
getGroupSortedNestedTupleDataSet(ExecutionEnvironment env) {
-               List<Tuple2<Tuple2<IntValue, IntValue>, StringValue>> data = 
new ArrayList<>();
-
-               data.add(new Tuple2<>(new Tuple2<>(new IntValue(1), new 
IntValue(3)), new StringValue("a")));
-               data.add(new Tuple2<>(new Tuple2<>(new IntValue(1), new 
IntValue(2)), new StringValue("a")));
-               data.add(new Tuple2<>(new Tuple2<>(new IntValue(2), new 
IntValue(1)), new StringValue("a")));
-               data.add(new Tuple2<>(new Tuple2<>(new IntValue(2), new 
IntValue(2)), new StringValue("b")));
-               data.add(new Tuple2<>(new Tuple2<>(new IntValue(3), new 
IntValue(3)), new StringValue("c")));
-               data.add(new Tuple2<>(new Tuple2<>(new IntValue(3), new 
IntValue(6)), new StringValue("c")));
-               data.add(new Tuple2<>(new Tuple2<>(new IntValue(4), new 
IntValue(9)), new StringValue("c")));
-
-               TupleTypeInfo<Tuple2<Tuple2<IntValue, IntValue>, StringValue>> 
type = new
-                       TupleTypeInfo<>(
-                               new TupleTypeInfo<Tuple2<IntValue, 
IntValue>>(ValueTypeInfo.INT_VALUE_TYPE_INFO, 
ValueTypeInfo.INT_VALUE_TYPE_INFO),
-                               ValueTypeInfo.STRING_VALUE_TYPE_INFO
-               );
-
-               return env.fromCollection(data, type);
-       }
-
-       public static DataSet<Tuple3<Tuple2<IntValue, IntValue>, StringValue, 
IntValue>> getGroupSortedNestedTupleDataSet2(ExecutionEnvironment env) {
-               List<Tuple3<Tuple2<IntValue, IntValue>, StringValue, IntValue>> 
data = new ArrayList<>();
-
-               data.add(new Tuple3<>(new Tuple2<IntValue, IntValue>(new 
IntValue(1), new IntValue(3)), new StringValue("a"), new IntValue(2)));
-               data.add(new Tuple3<>(new Tuple2<IntValue, IntValue>(new 
IntValue(1), new IntValue(2)), new StringValue("a"), new IntValue(1)));
-               data.add(new Tuple3<>(new Tuple2<IntValue, IntValue>(new 
IntValue(2), new IntValue(1)), new StringValue("a"), new IntValue(3)));
-               data.add(new Tuple3<>(new Tuple2<IntValue, IntValue>(new 
IntValue(2), new IntValue(2)), new StringValue("b"), new IntValue(4)));
-               data.add(new Tuple3<>(new Tuple2<IntValue, IntValue>(new 
IntValue(3), new IntValue(3)), new StringValue("c"), new IntValue(5)));
-               data.add(new Tuple3<>(new Tuple2<IntValue, IntValue>(new 
IntValue(3), new IntValue(6)), new StringValue("c"), new IntValue(6)));
-               data.add(new Tuple3<>(new Tuple2<IntValue, IntValue>(new 
IntValue(4), new IntValue(9)), new StringValue("c"), new IntValue(7)));
-
-               TupleTypeInfo<Tuple3<Tuple2<IntValue, IntValue>, StringValue, 
IntValue>> type = new
-                       TupleTypeInfo<>(
-                               new TupleTypeInfo<Tuple2<IntValue, 
IntValue>>(ValueTypeInfo.INT_VALUE_TYPE_INFO, 
ValueTypeInfo.INT_VALUE_TYPE_INFO),
-                               ValueTypeInfo.STRING_VALUE_TYPE_INFO,
-                               ValueTypeInfo.INT_VALUE_TYPE_INFO
-               );
-
-               return env.fromCollection(data, type);
-       }
-       
-       public static DataSet<StringValue> 
getStringDataSet(ExecutionEnvironment env) {
-               List<StringValue> data = new ArrayList<>();
-
-               data.add(new StringValue("Hi"));
-               data.add(new StringValue("Hello"));
-               data.add(new StringValue("Hello world"));
-               data.add(new StringValue("Hello world, how are you?"));
-               data.add(new StringValue("I am fine."));
-               data.add(new StringValue("Luke Skywalker"));
-               data.add(new StringValue("Random comment"));
-               data.add(new StringValue("LOL"));
-
-               Collections.shuffle(data);
-
-               return env.fromCollection(data);
-       }
-
-       public static DataSet<IntValue> getIntDataSet(ExecutionEnvironment env) 
{
-               List<IntValue> data = new ArrayList<>();
-
-               data.add(new IntValue(1));
-               data.add(new IntValue(2));
-               data.add(new IntValue(2));
-               data.add(new IntValue(3));
-               data.add(new IntValue(3));
-               data.add(new IntValue(3));
-               data.add(new IntValue(4));
-               data.add(new IntValue(4));
-               data.add(new IntValue(4));
-               data.add(new IntValue(4));
-               data.add(new IntValue(5));
-               data.add(new IntValue(5));
-               data.add(new IntValue(5));
-               data.add(new IntValue(5));
-               data.add(new IntValue(5));
-
-               Collections.shuffle(data);
-
-               return env.fromCollection(data);
-       }
-
-       public static DataSet<CustomType> 
getCustomTypeDataSet(ExecutionEnvironment env) {
-               List<CustomType> data = new ArrayList<CustomType>();
-
-               data.add(new CustomType(1, 0l, "Hi"));
-               data.add(new CustomType(2, 1l, "Hello"));
-               data.add(new CustomType(2, 2l, "Hello world"));
-               data.add(new CustomType(3, 3l, "Hello world, how are you?"));
-               data.add(new CustomType(3, 4l, "I am fine."));
-               data.add(new CustomType(3, 5l, "Luke Skywalker"));
-               data.add(new CustomType(4, 6l, "Comment#1"));
-               data.add(new CustomType(4, 7l, "Comment#2"));
-               data.add(new CustomType(4, 8l, "Comment#3"));
-               data.add(new CustomType(4, 9l, "Comment#4"));
-               data.add(new CustomType(5, 10l, "Comment#5"));
-               data.add(new CustomType(5, 11l, "Comment#6"));
-               data.add(new CustomType(5, 12l, "Comment#7"));
-               data.add(new CustomType(5, 13l, "Comment#8"));
-               data.add(new CustomType(5, 14l, "Comment#9"));
-               data.add(new CustomType(6, 15l, "Comment#10"));
-               data.add(new CustomType(6, 16l, "Comment#11"));
-               data.add(new CustomType(6, 17l, "Comment#12"));
-               data.add(new CustomType(6, 18l, "Comment#13"));
-               data.add(new CustomType(6, 19l, "Comment#14"));
-               data.add(new CustomType(6, 20l, "Comment#15"));
-
-               Collections.shuffle(data);
-
-               return env.fromCollection(data);
-       }
-
-       public static DataSet<CustomType> 
getSmallCustomTypeDataSet(ExecutionEnvironment env) {
-               List<CustomType> data = new ArrayList<CustomType>();
-
-               data.add(new CustomType(1, 0l, "Hi"));
-               data.add(new CustomType(2, 1l, "Hello"));
-               data.add(new CustomType(2, 2l, "Hello world"));
-
-               Collections.shuffle(data);
-
-               return env.fromCollection(data);
-       }
-
-       public static class CustomType implements Serializable {
-
-               private static final long serialVersionUID = 1L;
-
-               public IntValue myInt;
-               public LongValue myLong;
-               public StringValue myString;
-
-               public CustomType() {
-               }
-
-               public CustomType(int i, long l, String s) {
-                       myInt = new IntValue(i);
-                       myLong = new LongValue(l);
-                       myString = new StringValue(s);
-               }
-
-               @Override
-               public String toString() {
-                       return myInt + "," + myLong + "," + myString;
-               }
-       }
-
-       public static class CustomTypeComparator implements 
Comparator<CustomType> {
-
-               @Override
-               public int compare(CustomType o1, CustomType o2) {
-                       int diff = o1.myInt.getValue() - o2.myInt.getValue();
-                       if (diff != 0) {
-                               return diff;
-                       }
-                       diff = (int) (o1.myLong.getValue() - 
o2.myLong.getValue());
-                       return diff != 0 ? diff : 
o1.myString.getValue().compareTo(o2.myString.getValue());
-               }
-
-       }
-
-       public static DataSet<Tuple7<IntValue, StringValue, IntValue, IntValue, 
LongValue, StringValue, LongValue>> 
getSmallTuplebasedDataSet(ExecutionEnvironment env) {
-               List<Tuple7<IntValue, StringValue, IntValue, IntValue, 
LongValue, StringValue, LongValue>> data = new ArrayList<>();
-               
-               data.add(new Tuple7<>(new IntValue(1), new 
StringValue("First"), new IntValue(10), new IntValue(100), new 
LongValue(1000L), new StringValue("One"), new LongValue(10000L)));
-               data.add(new Tuple7<>(new IntValue(2), new 
StringValue("Second"), new IntValue(20), new IntValue(200), new 
LongValue(2000L), new StringValue("Two"), new LongValue(20000L)));
-               data.add(new Tuple7<>(new IntValue(3), new 
StringValue("Third"), new IntValue(30), new IntValue(300), new 
LongValue(3000L), new StringValue("Three"), new LongValue(30000L)));
-
-               return env.fromCollection(data);
-       }
-       
-       public static DataSet<Tuple7<LongValue, IntValue, IntValue, LongValue, 
StringValue, IntValue, StringValue>> 
getSmallTuplebasedDataSetMatchingPojo(ExecutionEnvironment env) {
-               List<Tuple7<LongValue, IntValue, IntValue, LongValue, 
StringValue, IntValue, StringValue>> data = new ArrayList<>();
-               
-               data.add(new Tuple7<>(new LongValue(10000L), new IntValue(10), 
new IntValue(100), new LongValue(1000L), new StringValue("One"), new 
IntValue(1), new StringValue("First")));
-               data.add(new Tuple7<>(new LongValue(20000L), new IntValue(20), 
new IntValue(200), new LongValue(2000L), new StringValue("Two"), new 
IntValue(2), new StringValue("Second")));
-               data.add(new Tuple7<>(new LongValue(30000L), new IntValue(30), 
new IntValue(300), new LongValue(3000L), new StringValue("Three"), new 
IntValue(3), new StringValue("Third")));
-               
-               return env.fromCollection(data);
-       }
-
-       public static DataSet<POJO> getSmallPojoDataSet(ExecutionEnvironment 
env) {
-               List<POJO> data = new ArrayList<POJO>();
-
-               data.add(new POJO(1 /*number*/, "First" /*str*/, 10 /*f0*/, 
100/*f1.myInt*/, 1000L/*f1.myLong*/, "One" /*f1.myString*/, 10000L 
/*nestedPojo.longNumber*/));
-               data.add(new POJO(2, "Second", 20, 200, 2000L, "Two", 20000L));
-               data.add(new POJO(3, "Third", 30, 300, 3000L, "Three", 30000L));
-
-               return env.fromCollection(data);
-       }
-
-       public static DataSet<POJO> 
getDuplicatePojoDataSet(ExecutionEnvironment env) {
-               List<POJO> data = new ArrayList<POJO>();
-
-               data.add(new POJO(1, "First", 10, 100, 1000L, "One", 10000L)); 
// 5x
-               data.add(new POJO(1, "First", 10, 100, 1000L, "One", 10000L));
-               data.add(new POJO(1, "First", 10, 100, 1000L, "One", 10000L));
-               data.add(new POJO(1, "First", 10, 100, 1000L, "One", 10000L));
-               data.add(new POJO(1, "First", 10, 100, 1000L, "One", 10000L));
-               data.add(new POJO(2, "Second", 20, 200, 2000L, "Two", 20000L));
-               data.add(new POJO(3, "Third", 30, 300, 3000L, "Three", 
30000L)); // 2x
-               data.add(new POJO(3, "Third", 30, 300, 3000L, "Three", 30000L));
-
-               return env.fromCollection(data);
-       }
-
-       public static DataSet<POJO> getMixedPojoDataSet(ExecutionEnvironment 
env) {
-               List<POJO> data = new ArrayList<POJO>();
-
-               data.add(new POJO(1, "First", 10, 100, 1000L, "One", 10100L)); 
// 5x
-               data.add(new POJO(2, "First_", 10, 105, 1000L, "One", 10200L));
-               data.add(new POJO(3, "First", 11, 102, 3000L, "One", 10200L));
-               data.add(new POJO(4, "First_", 11, 106, 1000L, "One", 10300L));
-               data.add(new POJO(5, "First", 11, 102, 2000L, "One", 10100L));
-               data.add(new POJO(6, "Second_", 20, 200, 2000L, "Two", 10100L));
-               data.add(new POJO(7, "Third", 31, 301, 2000L, "Three", 
10200L)); // 2x
-               data.add(new POJO(8, "Third_", 30, 300, 1000L, "Three", 
10100L));
-
-               return env.fromCollection(data);
-       }
-
-       public static class POJO {
-               public IntValue number;
-               public StringValue str;
-               public Tuple2<IntValue, CustomType> nestedTupleWithCustom;
-               public NestedPojo nestedPojo;
-               public transient LongValue ignoreMe;
-
-               public POJO(int i0, String s0,
-                                       int i1, int i2, long l0, String s1,
-                                       long l1) {
-                       this.number = new IntValue(i0);
-                       this.str = new StringValue(s0);
-                       this.nestedTupleWithCustom = new Tuple2<>(new 
IntValue(i1), new CustomType(i2, l0, s1));
-                       this.nestedPojo = new NestedPojo();
-                       this.nestedPojo.longNumber = new LongValue(l1);
-               }
-
-               public POJO() {
-               }
-
-               @Override
-               public String toString() {
-                       return number + " " + str + " " + nestedTupleWithCustom 
+ " " + nestedPojo.longNumber;
-               }
-       }
-
-       public static class NestedPojo {
-               public static Object ignoreMe;
-               public LongValue longNumber;
-
-               public NestedPojo() {
-               }
-       }
-
-       public static DataSet<CrazyNested> 
getCrazyNestedDataSet(ExecutionEnvironment env) {
-               List<CrazyNested> data = new ArrayList<CrazyNested>();
-
-               data.add(new CrazyNested("aa"));
-               data.add(new CrazyNested("bb"));
-               data.add(new CrazyNested("bb"));
-               data.add(new CrazyNested("cc"));
-               data.add(new CrazyNested("cc"));
-               data.add(new CrazyNested("cc"));
-
-               return env.fromCollection(data);
-       }
-
-       public static class CrazyNested {
-               public CrazyNestedL1 nest_Lvl1;
-               public LongValue something; // test proper null-value handling
-
-               public CrazyNested() {
-               }
-
-               public CrazyNested(String set, String second, long s) { // 
additional CTor to set all fields to non-null values
-                       this(set);
-                       something = new LongValue(s);
-                       nest_Lvl1.a = new StringValue(second);
-               }
-
-               public CrazyNested(String set) {
-                       nest_Lvl1 = new CrazyNestedL1();
-                       nest_Lvl1.nest_Lvl2 = new CrazyNestedL2();
-                       nest_Lvl1.nest_Lvl2.nest_Lvl3 = new CrazyNestedL3();
-                       nest_Lvl1.nest_Lvl2.nest_Lvl3.nest_Lvl4 = new 
CrazyNestedL4();
-                       nest_Lvl1.nest_Lvl2.nest_Lvl3.nest_Lvl4.f1nal = new 
StringValue(set);
-               }
-       }
-
-       public static class CrazyNestedL1 {
-               public StringValue a;
-               public IntValue b;
-               public CrazyNestedL2 nest_Lvl2;
-       }
-
-       public static class CrazyNestedL2 {
-               public CrazyNestedL3 nest_Lvl3;
-       }
-
-       public static class CrazyNestedL3 {
-               public CrazyNestedL4 nest_Lvl4;
-       }
-
-       public static class CrazyNestedL4 {
-               public StringValue f1nal;
-       }
-
-       // Copied from TypeExtractorTest
-       public static class FromTuple extends Tuple3<StringValue, StringValue, 
LongValue> {
-               private static final long serialVersionUID = 1L;
-               public IntValue special;
-       }
-
-       public static class FromTupleWithCTor extends FromTuple {
-
-               private static final long serialVersionUID = 1L;
-
-               public FromTupleWithCTor() {}
-
-               public FromTupleWithCTor(int special, long tupleField) {
-                       this.special = new IntValue(special);
-                       this.setField(new LongValue(tupleField), 2);
-               }
-       }
-
-       public static DataSet<FromTupleWithCTor> 
getPojoExtendingFromTuple(ExecutionEnvironment env) {
-               List<FromTupleWithCTor> data = new ArrayList<>();
-               data.add(new FromTupleWithCTor(1, 10L)); // 3x
-               data.add(new FromTupleWithCTor(1, 10L));
-               data.add(new FromTupleWithCTor(1, 10L));
-               data.add(new FromTupleWithCTor(2, 20L)); // 2x
-               data.add(new FromTupleWithCTor(2, 20L));
-               return env.fromCollection(data);
-       }
-
-       public static class PojoContainingTupleAndWritable {
-               public IntValue someInt;
-               public StringValue someString;
-               public IntWritable hadoopFan;
-               public Tuple2<LongValue, LongValue> theTuple;
-
-               public PojoContainingTupleAndWritable() {
-               }
-
-               public PojoContainingTupleAndWritable(int i, long l1, long l2) {
-                       hadoopFan = new IntWritable(i);
-                       someInt = new IntValue(i);
-                       theTuple = new Tuple2<>(new LongValue(l1), new 
LongValue(l2));
-               }
-       }
-
-       public static DataSet<PojoContainingTupleAndWritable> 
getPojoContainingTupleAndWritable(ExecutionEnvironment env) {
-               List<PojoContainingTupleAndWritable> data = new ArrayList<>();
-               data.add(new PojoContainingTupleAndWritable(1, 10L, 100L)); // 
1x
-               data.add(new PojoContainingTupleAndWritable(2, 20L, 200L)); // 
5x
-               data.add(new PojoContainingTupleAndWritable(2, 20L, 200L));
-               data.add(new PojoContainingTupleAndWritable(2, 20L, 200L));
-               data.add(new PojoContainingTupleAndWritable(2, 20L, 200L));
-               data.add(new PojoContainingTupleAndWritable(2, 20L, 200L));
-               return env.fromCollection(data);
-       }
-
-
-
-       public static DataSet<PojoContainingTupleAndWritable> 
getGroupSortedPojoContainingTupleAndWritable(ExecutionEnvironment env) {
-               List<PojoContainingTupleAndWritable> data = new ArrayList<>();
-               data.add(new PojoContainingTupleAndWritable(1, 10L, 100L)); // 
1x
-               data.add(new PojoContainingTupleAndWritable(2, 20L, 200L)); // 
5x
-               data.add(new PojoContainingTupleAndWritable(2, 20L, 201L));
-               data.add(new PojoContainingTupleAndWritable(2, 30L, 200L));
-               data.add(new PojoContainingTupleAndWritable(2, 30L, 600L));
-               data.add(new PojoContainingTupleAndWritable(2, 30L, 400L));
-               return env.fromCollection(data);
-       }
-
-       public static DataSet<Tuple3<IntValue, CrazyNested, POJO>> 
getTupleContainingPojos(ExecutionEnvironment env) {
-               List<Tuple3<IntValue, CrazyNested, POJO>> data = new 
ArrayList<>();
-               data.add(new Tuple3<IntValue, CrazyNested, POJO>(new 
IntValue(1), new CrazyNested("one", "uno", 1L), new POJO(1, "First", 10, 100, 
1000L, "One", 10000L))); // 3x
-               data.add(new Tuple3<IntValue, CrazyNested, POJO>(new 
IntValue(1), new CrazyNested("one", "uno", 1L), new POJO(1, "First", 10, 100, 
1000L, "One", 10000L)));
-               data.add(new Tuple3<IntValue, CrazyNested, POJO>(new 
IntValue(1), new CrazyNested("one", "uno", 1L), new POJO(1, "First", 10, 100, 
1000L, "One", 10000L)));
-               // POJO is not initialized according to the first two fields.
-               data.add(new Tuple3<IntValue, CrazyNested, POJO>(new 
IntValue(2), new CrazyNested("two", "duo", 2L), new POJO(1, "First", 10, 100, 
1000L, "One", 10000L))); // 1x
-               return env.fromCollection(data);
-       }
-
-       public static class Pojo1 {
-               public StringValue a;
-               public StringValue b;
-
-               public Pojo1() {}
-
-               public Pojo1(String a, String b) {
-                       this.a = new StringValue(a);
-                       this.b = new StringValue(b);
-               }
-       }
-
-       public static class Pojo2 {
-               public StringValue a2;
-               public StringValue b2;
-       }
-
-       public static class PojoWithMultiplePojos {
-               public Pojo1 p1;
-               public Pojo2 p2;
-               public IntValue i0;
-
-               public PojoWithMultiplePojos() {
-               }
-
-               public PojoWithMultiplePojos(String a, String b, String a1, 
String b1, int i0) {
-                       p1 = new Pojo1();
-                       p1.a = new StringValue(a);
-                       p1.b = new StringValue(b);
-                       p2 = new Pojo2();
-                       p2.a2 = new StringValue(a1);
-                       p2.b2 = new StringValue(b1);
-                       this.i0 = new IntValue(i0);
-               }
-       }
-
-       public static DataSet<PojoWithMultiplePojos> 
getPojoWithMultiplePojos(ExecutionEnvironment env) {
-               List<PojoWithMultiplePojos> data = new ArrayList<>();
-               data.add(new PojoWithMultiplePojos("a", "aa", "b", "bb", 1));
-               data.add(new PojoWithMultiplePojos("b", "bb", "c", "cc", 2));
-               data.add(new PojoWithMultiplePojos("b", "bb", "c", "cc", 2));
-               data.add(new PojoWithMultiplePojos("b", "bb", "c", "cc", 2));
-               data.add(new PojoWithMultiplePojos("d", "dd", "e", "ee", 3));
-               data.add(new PojoWithMultiplePojos("d", "dd", "e", "ee", 3));
-               return env.fromCollection(data);
-       }
-
-       public enum Category {
-               CAT_A, CAT_B;
-       }
-
-       public static class PojoWithDateAndEnum {
-               public StringValue group;
-               public Date date;
-               public Category cat;
-       }
-       
-       public static DataSet<PojoWithDateAndEnum> 
getPojoWithDateAndEnum(ExecutionEnvironment env) {
-               List<PojoWithDateAndEnum> data = new 
ArrayList<PojoWithDateAndEnum>();
-               
-               PojoWithDateAndEnum one = new PojoWithDateAndEnum();
-               one.group = new StringValue("a");
-               one.date = new Date(666);
-               one.cat = Category.CAT_A;
-               data.add(one);
-               
-               PojoWithDateAndEnum two = new PojoWithDateAndEnum();
-               two.group = new StringValue("a");
-               two.date = new Date(666);
-               two.cat = Category.CAT_A;
-               data.add(two);
-               
-               PojoWithDateAndEnum three = new PojoWithDateAndEnum();
-               three.group = new StringValue("b");
-               three.date = new Date(666);
-               three.cat = Category.CAT_B;
-               data.add(three);
-               
-               return env.fromCollection(data);
-       }
-
-       public static class PojoWithCollection {
-               public List<Pojo1> pojos;
-               public IntValue key;
-               public java.sql.Date sqlDate;
-               public BigInteger bigInt;
-               public BigDecimal bigDecimalKeepItNull;
-               public BigInt scalaBigInt;
-               public List<Object> mixed;
-
-               @Override
-               public String toString() {
-                       return "PojoWithCollection{" +
-                                       "pojos.size()=" + pojos.size() +
-                                       ", key=" + key +
-                                       ", sqlDate=" + sqlDate +
-                                       ", bigInt=" + bigInt +
-                                       ", bigDecimalKeepItNull=" + 
bigDecimalKeepItNull +
-                                       ", scalaBigInt=" + scalaBigInt +
-                                       ", mixed=" + mixed +
-                                       '}';
-               }
-       }
-
-       public static class PojoWithCollectionGeneric {
-               public List<Pojo1> pojos;
-               public IntValue key;
-               public java.sql.Date sqlDate;
-               public BigInteger bigInt;
-               public BigDecimal bigDecimalKeepItNull;
-               public BigInt scalaBigInt;
-               public List<Object> mixed;
-               private PojoWithDateAndEnum makeMeGeneric;
-
-               @Override
-               public String toString() {
-                       return "PojoWithCollection{" +
-                                       "pojos.size()=" + pojos.size() +
-                                       ", key=" + key +
-                                       ", sqlDate=" + sqlDate +
-                                       ", bigInt=" + bigInt +
-                                       ", bigDecimalKeepItNull=" + 
bigDecimalKeepItNull +
-                                       ", scalaBigInt=" + scalaBigInt +
-                                       ", mixed=" + mixed +
-                                       '}';
-               }
-       }
-
-       public static DataSet<PojoWithCollection> 
getPojoWithCollection(ExecutionEnvironment env) {
-               List<PojoWithCollection> data = new ArrayList<>();
-
-               List<Pojo1> pojosList1 = new ArrayList<>();
-               pojosList1.add(new Pojo1("a", "aa"));
-               pojosList1.add(new Pojo1("b", "bb"));
-
-               List<Pojo1> pojosList2 = new ArrayList<>();
-               pojosList2.add(new Pojo1("a2", "aa2"));
-               pojosList2.add(new Pojo1("b2", "bb2"));
-
-               PojoWithCollection pwc1 = new PojoWithCollection();
-               pwc1.pojos = pojosList1;
-               pwc1.key = new IntValue(0);
-               pwc1.bigInt = 
BigInteger.valueOf(Long.MAX_VALUE).multiply(BigInteger.TEN);
-               pwc1.scalaBigInt = BigInt.int2bigInt(10);
-               pwc1.bigDecimalKeepItNull = null;
-               
-               // use calendar to make it stable across time zones
-               GregorianCalendar gcl1 = new GregorianCalendar(2033, 04, 18);
-               pwc1.sqlDate = new java.sql.Date(gcl1.getTimeInMillis());
-               pwc1.mixed = new ArrayList<Object>();
-               Map<StringValue, IntValue> map = new HashMap<>();
-               map.put(new StringValue("someKey"), new IntValue(1));
-               pwc1.mixed.add(map);
-               pwc1.mixed.add(new File("/this/is/wrong"));
-               pwc1.mixed.add("uhlala");
-
-               PojoWithCollection pwc2 = new PojoWithCollection();
-               pwc2.pojos = pojosList2;
-               pwc2.key = new IntValue(0);
-               pwc2.bigInt = 
BigInteger.valueOf(Long.MAX_VALUE).multiply(BigInteger.TEN);
-               pwc2.scalaBigInt = BigInt.int2bigInt(31104000);
-               pwc2.bigDecimalKeepItNull = null;
-               
-               GregorianCalendar gcl2 = new GregorianCalendar(1976, 4, 3);
-               pwc2.sqlDate = new java.sql.Date(gcl2.getTimeInMillis()); // 
1976
-
-               data.add(pwc1);
-               data.add(pwc2);
-
-               return env.fromCollection(data);
-       }
-
-}
-

http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/manual/CheckForbiddenMethodsUsage.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/manual/CheckForbiddenMethodsUsage.java
 
b/flink-tests/src/test/java/org/apache/flink/test/manual/CheckForbiddenMethodsUsage.java
index aabe7c0..6413a3b 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/manual/CheckForbiddenMethodsUsage.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/manual/CheckForbiddenMethodsUsage.java
@@ -15,13 +15,13 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.test.manual;
 
 import org.apache.flink.types.parser.FieldParserTest;
 
 import org.junit.BeforeClass;
 import org.junit.Test;
-
 import org.reflections.Reflections;
 import org.reflections.scanners.MemberUsageScanner;
 import org.reflections.util.ClasspathHelper;
@@ -40,7 +40,7 @@ import static org.junit.Assert.assertEquals;
 
 /**
  * Tests via reflection that certain methods are not called in Flink.
- * 
+ *
  * <p>Forbidden calls include:
  *   - Byte / String conversions that do not specify an explicit charset
  *     because they produce different results in different locales
@@ -116,11 +116,10 @@ public class CheckForbiddenMethodsUsage {
                        .addUrls(ClasspathHelper.forPackage("org.apache.flink"))
                        .addScanners(new MemberUsageScanner()));
 
-
                for (ForbiddenCall forbiddenCall : forbiddenCalls) {
                        final Set<Member> methodUsages = 
forbiddenCall.getUsages(reflections);
                        methodUsages.removeAll(forbiddenCall.getExclusions());
-                       assertEquals("Unexpected calls: " + methodUsages,0, 
methodUsages.size());
+                       assertEquals("Unexpected calls: " + methodUsages, 0, 
methodUsages.size());
                }
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/manual/HashTableRecordWidthCombinations.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/manual/HashTableRecordWidthCombinations.java
 
b/flink-tests/src/test/java/org/apache/flink/test/manual/HashTableRecordWidthCombinations.java
index 0692196..f02cf1c 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/manual/HashTableRecordWidthCombinations.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/manual/HashTableRecordWidthCombinations.java
@@ -40,23 +40,26 @@ import java.util.List;
 
 import static org.junit.Assert.fail;
 
+/**
+ * Manual test for growing hash tables.
+ */
 public class HashTableRecordWidthCombinations {
 
        public static void main(String[] args) throws Exception {
 
                @SuppressWarnings("unchecked")
-               final TypeSerializer<Tuple2<Long, byte[]>> buildSerializer = 
+               final TypeSerializer<Tuple2<Long, byte[]>> buildSerializer =
                                new TupleSerializer<Tuple2<Long, byte[]>>(
                                                (Class<Tuple2<Long, byte[]>>) 
(Class<?>) Tuple2.class,
                                                new TypeSerializer<?>[] { 
LongSerializer.INSTANCE, BytePrimitiveArraySerializer.INSTANCE });
-               
+
                final TypeSerializer<Long> probeSerializer = 
LongSerializer.INSTANCE;
 
                final TypeComparator<Tuple2<Long, byte[]>> buildComparator = 
new TupleComparator<Tuple2<Long, byte[]>>(
                                new int[] {0},
                                new TypeComparator<?>[] { new 
LongComparator(true) },
                                new TypeSerializer<?>[] { 
LongSerializer.INSTANCE });
-               
+
                final TypeComparator<Long> probeComparator = new 
LongComparator(true);
 
                final TypePairComparator<Long, Tuple2<Long, byte[]>> 
pairComparator = new TypePairComparator<Long, Tuple2<Long, byte[]>>() {
@@ -85,7 +88,7 @@ public class HashTableRecordWidthCombinations {
                final IOManager ioMan = new IOManagerAsync();
 
                try {
-                       final int pageSize = 32*1024;
+                       final int pageSize = 32 * 1024;
                        final int numSegments = 34;
 
                        for (int num = 3400; num < 3550; num++) {
@@ -151,7 +154,7 @@ public class HashTableRecordWidthCombinations {
                                        try {
                                                while (table.nextRecord()) {
                                                        
MutableObjectIterator<Tuple2<Long, byte[]>> matches = 
table.getBuildSideIterator();
-                                                       while (matches.next() 
!= null);
+                                                       while (matches.next() 
!= null) {}
                                                }
                                        }
                                        catch (RuntimeException e) {
@@ -176,11 +179,11 @@ public class HashTableRecordWidthCombinations {
                        ioMan.shutdown();
                }
        }
-       
+
        // 
------------------------------------------------------------------------
        //  Utilities
        // 
------------------------------------------------------------------------
-       
+
        private static List<MemorySegment> getMemory(int numSegments, int 
segmentSize) {
                ArrayList<MemorySegment> list = new 
ArrayList<MemorySegment>(numSegments);
                for (int i = 0; i < numSegments; i++) {
@@ -188,7 +191,7 @@ public class HashTableRecordWidthCombinations {
                }
                return list;
        }
-       
+
        private static void checkNoTempFilesRemain(IOManager ioManager) {
                for (File dir : ioManager.getSpillingDirectories()) {
                        for (String file : dir.list()) {

http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/manual/MassiveStringSorting.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/manual/MassiveStringSorting.java
 
b/flink-tests/src/test/java/org/apache/flink/test/manual/MassiveStringSorting.java
index 9821b05..c69e6fd 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/manual/MassiveStringSorting.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/manual/MassiveStringSorting.java
@@ -18,14 +18,6 @@
 
 package org.apache.flink.test.manual;
 
-import java.io.BufferedReader;
-import java.io.BufferedWriter;
-import java.io.File;
-import java.io.FileReader;
-import java.io.FileWriter;
-import java.io.IOException;
-import java.util.Random;
-
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.typeutils.TypeComparator;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
@@ -41,13 +33,24 @@ import org.apache.flink.runtime.memory.MemoryManager;
 import org.apache.flink.runtime.operators.sort.UnilateralSortMerger;
 import org.apache.flink.runtime.operators.testutils.DummyInvokable;
 import org.apache.flink.util.MutableObjectIterator;
+
 import org.junit.Assert;
 
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileReader;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.util.Random;
+
+/**
+ * Test {@link UnilateralSortMerger} on a large set of {@code String}.
+ */
 public class MassiveStringSorting {
 
        private static final long SEED = 347569784659278346L;
-       
-       
+
        public void testStringSorting() {
                File input = null;
                File sorted = null;
@@ -55,12 +58,12 @@ public class MassiveStringSorting {
                try {
                        // the source file
                        input = generateFileWithStrings(300000, 
"http://some-uri.com/that/is/a/common/prefix/to/all";);
-                       
+
                        // the sorted file
                        sorted = File.createTempFile("sorted_strings", "txt");
-                       
-                       String[] command = {"/bin/bash","-c","export 
LC_ALL=\"C\" && cat \"" + input.getAbsolutePath() + "\" | sort > \"" + 
sorted.getAbsolutePath() + "\""};
-                       
+
+                       String[] command = {"/bin/bash", "-c", "export 
LC_ALL=\"C\" && cat \"" + input.getAbsolutePath() + "\" | sort > \"" + 
sorted.getAbsolutePath() + "\""};
+
                        Process p = null;
                        try {
                                p = Runtime.getRuntime().exec(command);
@@ -74,37 +77,37 @@ public class MassiveStringSorting {
                                        p.destroy();
                                }
                        }
-                       
+
                        // sort the data
                        UnilateralSortMerger<String> sorter = null;
                        BufferedReader reader = null;
                        BufferedReader verifyReader = null;
-                       
+
                        try {
                                MemoryManager mm = new MemoryManager(1024 * 
1024, 1);
                                IOManager ioMan = new IOManagerAsync();
-                                       
+
                                TypeSerializer<String> serializer = 
StringSerializer.INSTANCE;
                                TypeComparator<String> comparator = new 
StringComparator(true);
-                               
+
                                reader = new BufferedReader(new 
FileReader(input));
                                MutableObjectIterator<String> inputIterator = 
new StringReaderMutableObjectIterator(reader);
-                               
+
                                sorter = new UnilateralSortMerger<String>(mm, 
ioMan, inputIterator, new DummyInvokable(),
                                                new 
RuntimeSerializerFactory<String>(serializer, String.class), comparator, 1.0, 4, 
0.8f,
                                                true /* use large record 
handler */, false);
 
                                MutableObjectIterator<String> sortedData = 
sorter.getIterator();
-                               
+
                                reader.close();
-                               
+
                                // verify
                                verifyReader = new BufferedReader(new 
FileReader(sorted));
                                String next;
-                               
+
                                while ((next = verifyReader.readLine()) != 
null) {
                                        String nextFromStratoSort = 
sortedData.next("");
-                                       
+
                                        
Assert.assertNotNull(nextFromStratoSort);
                                        Assert.assertEquals(next, 
nextFromStratoSort);
                                }
@@ -135,23 +138,23 @@ public class MassiveStringSorting {
                        }
                }
        }
-       
+
        @SuppressWarnings("unchecked")
        public void testStringTuplesSorting() {
-               final int NUM_STRINGS = 300000;
-               
+               final int numStrings = 300000;
+
                File input = null;
                File sorted = null;
 
                try {
                        // the source file
-                       input = generateFileWithStringTuples(NUM_STRINGS, 
"http://some-uri.com/that/is/a/common/prefix/to/all";);
-                       
+                       input = generateFileWithStringTuples(numStrings, 
"http://some-uri.com/that/is/a/common/prefix/to/all";);
+
                        // the sorted file
                        sorted = File.createTempFile("sorted_strings", "txt");
-                       
-                       String[] command = {"/bin/bash","-c","export 
LC_ALL=\"C\" && cat \"" + input.getAbsolutePath() + "\" | sort > \"" + 
sorted.getAbsolutePath() + "\""};
-                       
+
+                       String[] command = {"/bin/bash", "-c", "export 
LC_ALL=\"C\" && cat \"" + input.getAbsolutePath() + "\" | sort > \"" + 
sorted.getAbsolutePath() + "\""};
+
                        Process p = null;
                        try {
                                p = Runtime.getRuntime().exec(command);
@@ -165,33 +168,31 @@ public class MassiveStringSorting {
                                        p.destroy();
                                }
                        }
-                       
+
                        // sort the data
                        UnilateralSortMerger<Tuple2<String, String[]>> sorter = 
null;
                        BufferedReader reader = null;
                        BufferedReader verifyReader = null;
-                       
+
                        try {
                                MemoryManager mm = new MemoryManager(1024 * 
1024, 1);
                                IOManager ioMan = new IOManagerAsync();
-                                       
-                               TupleTypeInfo<Tuple2<String, String[]>> 
typeInfo = (TupleTypeInfo<Tuple2<String, String[]>>) 
+
+                               TupleTypeInfo<Tuple2<String, String[]>> 
typeInfo = (TupleTypeInfo<Tuple2<String, String[]>>)
                                                TypeInfoParser.<Tuple2<String, 
String[]>>parse("Tuple2<String, String[]>");
 
                                TypeSerializer<Tuple2<String, String[]>> 
serializer = typeInfo.createSerializer(new ExecutionConfig());
                                TypeComparator<Tuple2<String, String[]>> 
comparator = typeInfo.createComparator(new int[] { 0 }, new boolean[] { true }, 
0, new ExecutionConfig());
-                               
+
                                reader = new BufferedReader(new 
FileReader(input));
                                MutableObjectIterator<Tuple2<String, String[]>> 
inputIterator = new StringTupleReaderMutableObjectIterator(reader);
-                               
+
                                sorter = new 
UnilateralSortMerger<Tuple2<String, String[]>>(mm, ioMan, inputIterator, new 
DummyInvokable(),
                                                new 
RuntimeSerializerFactory<Tuple2<String, String[]>>(serializer, 
(Class<Tuple2<String, String[]>>) (Class<?>) Tuple2.class), comparator, 1.0, 4, 
0.8f,
                                                true /* use large record 
handler */, false);
 
-                               
-                               
                                // use this part to verify that all if good 
when sorting in memory
-                               
+
 //                             List<MemorySegment> memory = 
mm.allocatePages(new DummyInvokable(), mm.computeNumberOfPages(1024*1024*1024));
 //                             NormalizedKeySorter<Tuple2<String, String[]>> 
nks = new NormalizedKeySorter<Tuple2<String,String[]>>(serializer, comparator, 
memory);
 //
@@ -200,36 +201,36 @@ public class MassiveStringSorting {
 //                                     while ((wi = inputIterator.next(wi)) != 
null) {
 //                                             
Assert.assertTrue(nks.write(wi));
 //                                     }
-//                                     
+//
 //                                     new QuickSort().sort(nks);
 //                             }
-//                             
+//
 //                             MutableObjectIterator<Tuple2<String, String[]>> 
sortedData = nks.getIterator();
-                               
+
                                MutableObjectIterator<Tuple2<String, String[]>> 
sortedData = sorter.getIterator();
                                reader.close();
-                               
+
                                // verify
                                verifyReader = new BufferedReader(new 
FileReader(sorted));
                                MutableObjectIterator<Tuple2<String, String[]>> 
verifyIterator = new StringTupleReaderMutableObjectIterator(verifyReader);
-                               
+
                                Tuple2<String, String[]> next = new 
Tuple2<String, String[]>("", new String[0]);
                                Tuple2<String, String[]> nextFromStratoSort = 
new Tuple2<String, String[]>("", new String[0]);
-                               
+
                                int num = 0;
-                               
+
                                while ((next = verifyIterator.next(next)) != 
null) {
                                        num++;
-                                       
+
                                        nextFromStratoSort = 
sortedData.next(nextFromStratoSort);
                                        
Assert.assertNotNull(nextFromStratoSort);
-                                       
+
                                        Assert.assertEquals(next.f0, 
nextFromStratoSort.f0);
                                        Assert.assertArrayEquals(next.f1, 
nextFromStratoSort.f1);
                                }
-                               
+
                                
Assert.assertNull(sortedData.next(nextFromStratoSort));
-                               Assert.assertEquals(NUM_STRINGS, num);
+                               Assert.assertEquals(numStrings, num);
 
                        }
                        finally {
@@ -260,15 +261,15 @@ public class MassiveStringSorting {
        }
 
        // 
--------------------------------------------------------------------------------------------
-       
+
        private static final class StringReaderMutableObjectIterator implements 
MutableObjectIterator<String> {
-               
+
                private final BufferedReader reader;
 
                public StringReaderMutableObjectIterator(BufferedReader reader) 
{
                        this.reader = reader;
                }
-               
+
                @Override
                public String next(String reuse) throws IOException {
                        return reader.readLine();
@@ -279,22 +280,22 @@ public class MassiveStringSorting {
                        return reader.readLine();
                }
        }
-       
+
        private static final class StringTupleReaderMutableObjectIterator 
implements MutableObjectIterator<Tuple2<String, String[]>> {
-               
+
                private final BufferedReader reader;
 
                public StringTupleReaderMutableObjectIterator(BufferedReader 
reader) {
                        this.reader = reader;
                }
-               
+
                @Override
                public Tuple2<String, String[]> next(Tuple2<String, String[]> 
reuse) throws IOException {
                        String line = reader.readLine();
                        if (line == null) {
                                return null;
                        }
-                       
+
                        String[] parts = line.split(" ");
                        reuse.f0 = parts[0];
                        reuse.f1 = parts;
@@ -306,31 +307,31 @@ public class MassiveStringSorting {
                        return next(new Tuple2<String, String[]>());
                }
        }
-       
+
        // 
--------------------------------------------------------------------------------------------
-       
+
        private File generateFileWithStrings(int numStrings, String prefix) 
throws IOException {
                final Random rnd = new Random(SEED);
-               
+
                final StringBuilder bld = new StringBuilder();
                final int resetValue = prefix.length();
-               
+
                bld.append(prefix);
-               
+
                File f = File.createTempFile("strings", "txt");
                BufferedWriter wrt = null;
                try {
                        wrt = new BufferedWriter(new FileWriter(f));
-               
-                       for (int i = 0 ; i < numStrings; i++) {
+
+                       for (int i = 0; i < numStrings; i++) {
                                bld.setLength(resetValue);
-                               
+
                                int len = rnd.nextInt(20) + 300;
                                for (int k = 0; k < len; k++) {
                                        char c = (char) (rnd.nextInt(80) + 40);
                                        bld.append(c);
                                }
-                               
+
                                String str = bld.toString();
                                wrt.write(str);
                                wrt.newLine();
@@ -338,52 +339,52 @@ public class MassiveStringSorting {
                } finally {
                        wrt.close();
                }
-               
+
                return f;
        }
-       
+
        private File generateFileWithStringTuples(int numStrings, String 
prefix) throws IOException {
                final Random rnd = new Random(SEED);
-               
+
                final StringBuilder bld = new StringBuilder();
 
                File f = File.createTempFile("strings", "txt");
                BufferedWriter wrt = null;
                try {
                        wrt = new BufferedWriter(new FileWriter(f));
-               
-                       for (int i = 0 ; i < numStrings; i++) {
+
+                       for (int i = 0; i < numStrings; i++) {
                                bld.setLength(0);
-                               
+
                                int numComps = rnd.nextInt(5) + 1;
-                               
+
                                for (int z = 0; z < numComps; z++) {
                                        if (z > 0) {
                                                bld.append(' ');
                                        }
                                        bld.append(prefix);
-                               
+
                                        int len = rnd.nextInt(20) + 10;
                                        for (int k = 0; k < len; k++) {
                                                char c = (char) 
(rnd.nextInt(80) + 40);
                                                bld.append(c);
                                        }
                                }
-                               
+
                                String str = bld.toString();
-                               
+
                                wrt.write(str);
                                wrt.newLine();
                        }
                } finally {
                        wrt.close();
                }
-               
+
                return f;
        }
-       
+
        // 
--------------------------------------------------------------------------------------------
-       
+
        public static void main(String[] args) {
                new MassiveStringSorting().testStringSorting();
                new MassiveStringSorting().testStringTuplesSorting();

http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/manual/MassiveStringValueSorting.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/manual/MassiveStringValueSorting.java
 
b/flink-tests/src/test/java/org/apache/flink/test/manual/MassiveStringValueSorting.java
index 9e37b79..453aa14 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/manual/MassiveStringValueSorting.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/manual/MassiveStringValueSorting.java
@@ -18,14 +18,6 @@
 
 package org.apache.flink.test.manual;
 
-import java.io.BufferedReader;
-import java.io.BufferedWriter;
-import java.io.File;
-import java.io.FileReader;
-import java.io.FileWriter;
-import java.io.IOException;
-import java.util.Random;
-
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.typeutils.TypeComparator;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
@@ -42,12 +34,24 @@ import 
org.apache.flink.runtime.operators.sort.UnilateralSortMerger;
 import org.apache.flink.runtime.operators.testutils.DummyInvokable;
 import org.apache.flink.types.StringValue;
 import org.apache.flink.util.MutableObjectIterator;
+
 import org.junit.Assert;
 
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileReader;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.util.Random;
+
+/**
+ * Test {@link UnilateralSortMerger} on a large set of {@link StringValue}.
+ */
 public class MassiveStringValueSorting {
 
        private static final long SEED = 347569784659278346L;
-       
+
        public void testStringValueSorting() {
                File input = null;
                File sorted = null;
@@ -55,12 +59,12 @@ public class MassiveStringValueSorting {
                try {
                        // the source file
                        input = generateFileWithStrings(300000, 
"http://some-uri.com/that/is/a/common/prefix/to/all";);
-                       
+
                        // the sorted file
                        sorted = File.createTempFile("sorted_strings", "txt");
-                       
-                       String[] command = {"/bin/bash","-c","export 
LC_ALL=\"C\" && cat \"" + input.getAbsolutePath() + "\" | sort > \"" + 
sorted.getAbsolutePath() + "\""};
-                       
+
+                       String[] command = {"/bin/bash", "-c", "export 
LC_ALL=\"C\" && cat \"" + input.getAbsolutePath() + "\" | sort > \"" + 
sorted.getAbsolutePath() + "\""};
+
                        Process p = null;
                        try {
                                p = Runtime.getRuntime().exec(command);
@@ -74,38 +78,38 @@ public class MassiveStringValueSorting {
                                        p.destroy();
                                }
                        }
-                       
+
                        // sort the data
                        UnilateralSortMerger<StringValue> sorter = null;
                        BufferedReader reader = null;
                        BufferedReader verifyReader = null;
-                       
+
                        try {
                                MemoryManager mm = new MemoryManager(1024 * 
1024, 1);
                                IOManager ioMan = new IOManagerAsync();
-                                       
+
                                TypeSerializer<StringValue> serializer = new 
CopyableValueSerializer<StringValue>(StringValue.class);
                                TypeComparator<StringValue> comparator = new 
CopyableValueComparator<StringValue>(true, StringValue.class);
-                               
+
                                reader = new BufferedReader(new 
FileReader(input));
                                MutableObjectIterator<StringValue> 
inputIterator = new StringValueReaderMutableObjectIterator(reader);
-                               
+
                                sorter = new 
UnilateralSortMerger<StringValue>(mm, ioMan, inputIterator, new 
DummyInvokable(),
                                                new 
RuntimeSerializerFactory<StringValue>(serializer, StringValue.class), 
comparator, 1.0, 4, 0.8f,
                                                true /* use large record 
handler */, true);
 
                                MutableObjectIterator<StringValue> sortedData = 
sorter.getIterator();
-                               
+
                                reader.close();
-                               
+
                                // verify
                                verifyReader = new BufferedReader(new 
FileReader(sorted));
                                String nextVerify;
                                StringValue nextFromFlinkSort = new 
StringValue();
-                               
+
                                while ((nextVerify = verifyReader.readLine()) 
!= null) {
                                        nextFromFlinkSort = 
sortedData.next(nextFromFlinkSort);
-                                       
+
                                        Assert.assertNotNull(nextFromFlinkSort);
                                        Assert.assertEquals(nextVerify, 
nextFromFlinkSort.getValue());
                                }
@@ -138,23 +142,23 @@ public class MassiveStringValueSorting {
                        }
                }
        }
-       
+
        @SuppressWarnings("unchecked")
        public void testStringValueTuplesSorting() {
-               final int NUM_STRINGS = 300000;
-               
+               final int numStrings = 300000;
+
                File input = null;
                File sorted = null;
 
                try {
                        // the source file
-                       input = generateFileWithStringTuples(NUM_STRINGS, 
"http://some-uri.com/that/is/a/common/prefix/to/all";);
-                       
+                       input = generateFileWithStringTuples(numStrings, 
"http://some-uri.com/that/is/a/common/prefix/to/all";);
+
                        // the sorted file
                        sorted = File.createTempFile("sorted_strings", "txt");
-                       
-                       String[] command = {"/bin/bash","-c","export 
LC_ALL=\"C\" && cat \"" + input.getAbsolutePath() + "\" | sort > \"" + 
sorted.getAbsolutePath() + "\""};
-                       
+
+                       String[] command = {"/bin/bash", "-c", "export 
LC_ALL=\"C\" && cat \"" + input.getAbsolutePath() + "\" | sort > \"" + 
sorted.getAbsolutePath() + "\""};
+
                        Process p = null;
                        try {
                                p = Runtime.getRuntime().exec(command);
@@ -168,33 +172,31 @@ public class MassiveStringValueSorting {
                                        p.destroy();
                                }
                        }
-                       
+
                        // sort the data
                        UnilateralSortMerger<Tuple2<StringValue, 
StringValue[]>> sorter = null;
                        BufferedReader reader = null;
                        BufferedReader verifyReader = null;
-                       
+
                        try {
                                MemoryManager mm = new MemoryManager(1024 * 
1024, 1);
                                IOManager ioMan = new IOManagerAsync();
-                                       
+
                                TupleTypeInfo<Tuple2<StringValue, 
StringValue[]>> typeInfo = (TupleTypeInfo<Tuple2<StringValue, StringValue[]>>)
                                                
TypeInfoParser.<Tuple2<StringValue, 
StringValue[]>>parse("Tuple2<org.apache.flink.types.StringValue, 
org.apache.flink.types.StringValue[]>");
 
                                TypeSerializer<Tuple2<StringValue, 
StringValue[]>> serializer = typeInfo.createSerializer(new ExecutionConfig());
                                TypeComparator<Tuple2<StringValue, 
StringValue[]>> comparator = typeInfo.createComparator(new int[] { 0 }, new 
boolean[] { true }, 0, new ExecutionConfig());
-                               
+
                                reader = new BufferedReader(new 
FileReader(input));
                                MutableObjectIterator<Tuple2<StringValue, 
StringValue[]>> inputIterator = new 
StringValueTupleReaderMutableObjectIterator(reader);
-                               
+
                                sorter = new 
UnilateralSortMerger<Tuple2<StringValue, StringValue[]>>(mm, ioMan, 
inputIterator, new DummyInvokable(),
                                                new 
RuntimeSerializerFactory<Tuple2<StringValue, StringValue[]>>(serializer, 
(Class<Tuple2<StringValue, StringValue[]>>) (Class<?>) Tuple2.class), 
comparator, 1.0, 4, 0.8f,
                                                true /* use large record 
handler */, false);
 
-                               
-                               
                                // use this part to verify that all if good 
when sorting in memory
-                               
+
 //                             List<MemorySegment> memory = 
mm.allocatePages(new DummyInvokable(), mm.computeNumberOfPages(1024*1024*1024));
 //                             NormalizedKeySorter<Tuple2<String, String[]>> 
nks = new NormalizedKeySorter<Tuple2<String,String[]>>(serializer, comparator, 
memory);
 //
@@ -203,36 +205,36 @@ public class MassiveStringValueSorting {
 //                                     while ((wi = inputIterator.next(wi)) != 
null) {
 //                                             
Assert.assertTrue(nks.write(wi));
 //                                     }
-//                                     
+//
 //                                     new QuickSort().sort(nks);
 //                             }
-//                             
+//
 //                             MutableObjectIterator<Tuple2<String, String[]>> 
sortedData = nks.getIterator();
-                               
+
                                MutableObjectIterator<Tuple2<StringValue, 
StringValue[]>> sortedData = sorter.getIterator();
                                reader.close();
-                               
+
                                // verify
                                verifyReader = new BufferedReader(new 
FileReader(sorted));
                                MutableObjectIterator<Tuple2<StringValue, 
StringValue[]>> verifyIterator = new 
StringValueTupleReaderMutableObjectIterator(verifyReader);
-                               
+
                                Tuple2<StringValue, StringValue[]> nextVerify = 
new Tuple2<StringValue, StringValue[]>(new StringValue(), new StringValue[0]);
                                Tuple2<StringValue, StringValue[]> 
nextFromFlinkSort = new Tuple2<StringValue, StringValue[]>(new StringValue(), 
new StringValue[0]);
-                               
+
                                int num = 0;
-                               
+
                                while ((nextVerify = 
verifyIterator.next(nextVerify)) != null) {
                                        num++;
-                                       
+
                                        nextFromFlinkSort = 
sortedData.next(nextFromFlinkSort);
                                        Assert.assertNotNull(nextFromFlinkSort);
-                                       
+
                                        Assert.assertEquals(nextVerify.f0, 
nextFromFlinkSort.f0);
                                        Assert.assertArrayEquals(nextVerify.f1, 
nextFromFlinkSort.f1);
                                }
-                               
+
                                
Assert.assertNull(sortedData.next(nextFromFlinkSort));
-                               Assert.assertEquals(NUM_STRINGS, num);
+                               Assert.assertEquals(numStrings, num);
 
                        }
                        finally {
@@ -265,23 +267,23 @@ public class MassiveStringValueSorting {
        }
 
        // 
--------------------------------------------------------------------------------------------
-       
+
        private static final class StringValueReaderMutableObjectIterator 
implements MutableObjectIterator<StringValue> {
-               
+
                private final BufferedReader reader;
 
                public StringValueReaderMutableObjectIterator(BufferedReader 
reader) {
                        this.reader = reader;
                }
-               
+
                @Override
                public StringValue next(StringValue reuse) throws IOException {
                        String line = reader.readLine();
-                       
+
                        if (line == null) {
                                return null;
                        }
-                       
+
                        reuse.setValue(line);
                        return reuse;
                }
@@ -291,30 +293,30 @@ public class MassiveStringValueSorting {
                        return next(new StringValue());
                }
        }
-       
+
        private static final class StringValueTupleReaderMutableObjectIterator 
implements MutableObjectIterator<Tuple2<StringValue, StringValue[]>> {
-               
+
                private final BufferedReader reader;
 
                public 
StringValueTupleReaderMutableObjectIterator(BufferedReader reader) {
                        this.reader = reader;
                }
-               
+
                @Override
                public Tuple2<StringValue, StringValue[]> 
next(Tuple2<StringValue, StringValue[]> reuse) throws IOException {
                        String line = reader.readLine();
                        if (line == null) {
                                return null;
                        }
-                       
+
                        String[] parts = line.split(" ");
                        reuse.f0.setValue(parts[0]);
                        reuse.f1 = new StringValue[parts.length];
-                       
+
                        for (int i = 0; i < parts.length; i++) {
                                reuse.f1[i] = new StringValue(parts[i]);
                        }
-                       
+
                        return reuse;
                }
 
@@ -323,31 +325,31 @@ public class MassiveStringValueSorting {
                        return next(new Tuple2<StringValue, StringValue[]>(new 
StringValue(), new StringValue[0]));
                }
        }
-       
+
        // 
--------------------------------------------------------------------------------------------
-       
+
        private File generateFileWithStrings(int numStrings, String prefix) 
throws IOException {
                final Random rnd = new Random(SEED);
-               
+
                final StringBuilder bld = new StringBuilder();
                final int resetValue = prefix.length();
-               
+
                bld.append(prefix);
-               
+
                File f = File.createTempFile("strings", "txt");
                BufferedWriter wrt = null;
                try {
                        wrt = new BufferedWriter(new FileWriter(f));
-               
-                       for (int i = 0 ; i < numStrings; i++) {
+
+                       for (int i = 0; i < numStrings; i++) {
                                bld.setLength(resetValue);
-                               
+
                                int len = rnd.nextInt(20) + 300;
                                for (int k = 0; k < len; k++) {
                                        char c = (char) (rnd.nextInt(80) + 40);
                                        bld.append(c);
                                }
-                               
+
                                String str = bld.toString();
                                wrt.write(str);
                                wrt.newLine();
@@ -357,40 +359,40 @@ public class MassiveStringValueSorting {
                                wrt.close();
                        }
                }
-               
+
                return f;
        }
-       
+
        private File generateFileWithStringTuples(int numStrings, String 
prefix) throws IOException {
                final Random rnd = new Random(SEED);
-               
+
                final StringBuilder bld = new StringBuilder();
 
                File f = File.createTempFile("strings", "txt");
                BufferedWriter wrt = null;
                try {
                        wrt = new BufferedWriter(new FileWriter(f));
-               
-                       for (int i = 0 ; i < numStrings; i++) {
+
+                       for (int i = 0; i < numStrings; i++) {
                                bld.setLength(0);
-                               
+
                                int numComps = rnd.nextInt(5) + 1;
-                               
+
                                for (int z = 0; z < numComps; z++) {
                                        if (z > 0) {
                                                bld.append(' ');
                                        }
                                        bld.append(prefix);
-                               
+
                                        int len = rnd.nextInt(20) + 10;
                                        for (int k = 0; k < len; k++) {
                                                char c = (char) 
(rnd.nextInt(80) + 40);
                                                bld.append(c);
                                        }
                                }
-                               
+
                                String str = bld.toString();
-                               
+
                                wrt.write(str);
                                wrt.newLine();
                        }
@@ -399,12 +401,12 @@ public class MassiveStringValueSorting {
                                wrt.close();
                        }
                }
-               
+
                return f;
        }
-       
+
        // 
--------------------------------------------------------------------------------------------
-       
+
        public static void main(String[] args) {
                new MassiveStringValueSorting().testStringValueSorting();
                new MassiveStringValueSorting().testStringValueTuplesSorting();

http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/manual/NotSoMiniClusterIterations.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/manual/NotSoMiniClusterIterations.java
 
b/flink-tests/src/test/java/org/apache/flink/test/manual/NotSoMiniClusterIterations.java
index ee3b4b2..0b8fd1c 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/manual/NotSoMiniClusterIterations.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/manual/NotSoMiniClusterIterations.java
@@ -38,14 +38,14 @@ import static org.junit.Assert.fail;
  * with a parallelism of 100.
  */
 public class NotSoMiniClusterIterations {
-       
+
        private static final int PARALLELISM = 100;
-       
+
        public static void main(String[] args) {
                if ((Runtime.getRuntime().maxMemory() >>> 20) < 5000) {
                        throw new RuntimeException("This test program needs to 
run with at least 5GB of heap space.");
                }
-               
+
                LocalFlinkMiniCluster cluster = null;
 
                try {
@@ -55,7 +55,7 @@ public class NotSoMiniClusterIterations {
                        
config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 1);
                        
config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 1000);
                        
config.setInteger(TaskManagerOptions.MEMORY_SEGMENT_SIZE, 8 * 1024);
-                       
+
                        config.setInteger("taskmanager.net.server.numThreads", 
1);
                        config.setInteger("taskmanager.net.client.numThreads", 
1);
 

Reply via email to