http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/926f835a/flink-tests/src/test/java/org/apache/flink/test/exampleJavaPrograms/WordCountPOJOITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/exampleJavaPrograms/WordCountPOJOITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/exampleJavaPrograms/WordCountPOJOITCase.java
deleted file mode 100644
index 0a5732c..0000000
--- 
a/flink-tests/src/test/java/org/apache/flink/test/exampleJavaPrograms/WordCountPOJOITCase.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-//
-//package org.apache.flink.test.exampleJavaPrograms;
-//
-////import org.apache.flink.examples.java.wordcount.WordCountPOJO;
-//import org.apache.flink.test.testdata.WordCountData;
-//import org.apache.flink.test.util.JavaProgramTestBase;
-//import org.junit.Ignore;
-//
-//@Ignore
-//public class WordCountPOJOITCase extends JavaProgramTestBase {
-//
-//     protected String textPath;
-//     protected String resultPath;
-//
-//     
-//     @Override
-//     protected void preSubmit() throws Exception {
-//             textPath = createTempFile("text.txt", WordCountData.TEXT);
-//             resultPath = getTempDirPath("result");
-//     }
-//
-//     @Override
-//     protected void postSubmit() throws Exception {
-//             compareResultsByLinesInMemory(WordCountData.COUNTS, resultPath);
-//     }
-//     
-//     @Override
-//     protected void testProgram() throws Exception {
-//             WordCountPOJO.main(new String[]{textPath, resultPath});
-//     }
-//}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/926f835a/flink-tests/src/test/java/org/apache/flink/test/exampleJavaPrograms/WordCountSimplePOJOITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/exampleJavaPrograms/WordCountSimplePOJOITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/exampleJavaPrograms/WordCountSimplePOJOITCase.java
new file mode 100644
index 0000000..7d20597
--- /dev/null
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/exampleJavaPrograms/WordCountSimplePOJOITCase.java
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.exampleJavaPrograms;
+
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.test.testdata.WordCountData;
+import org.apache.flink.test.util.JavaProgramTestBase;
+import org.apache.flink.util.Collector;
+
+import java.io.Serializable;
+
+
+public class WordCountSimplePOJOITCase extends JavaProgramTestBase implements 
Serializable {
+       private static final long serialVersionUID = 1L;
+       protected String textPath;
+       protected String resultPath;
+
+       
+       @Override
+       protected void preSubmit() throws Exception {
+               textPath = createTempFile("text.txt", WordCountData.TEXT);
+               resultPath = getTempDirPath("result");
+       }
+
+       @Override
+       protected void postSubmit() throws Exception {
+               compareResultsByLinesInMemory(WordCountData.COUNTS, resultPath);
+       }
+       
+       @Override
+       protected void testProgram() throws Exception {
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               DataSet<String> text = env.readTextFile(textPath);
+
+               DataSet<WC> counts = text
+                               .flatMap(new Tokenizer())
+                               .groupBy("word")
+                               .reduce(new ReduceFunction<WC>() {
+                                       private static final long 
serialVersionUID = 1L;
+
+                                       public WC reduce(WC value1, WC value2) {
+                                               return new WC(value1.word, 
value1.count + value2.count);
+                                       }
+                               });
+
+               counts.writeAsText(resultPath);
+
+               env.execute("WordCount with custom data types example");
+       }
+
+       public static final class Tokenizer implements FlatMapFunction<String, 
WC> {
+               private static final long serialVersionUID = 1L;
+
+               @Override
+               public void flatMap(String value, Collector<WC> out) {
+                       // normalize and split the line
+                       String[] tokens = value.toLowerCase().split("\\W+");
+
+                       // emit the pairs
+                       for (String token : tokens) {
+                               if (token.length() > 0) {
+                                       out.collect(new WC(token, 1));
+                               }
+                       }
+               }
+       }
+
+       public static class WC {
+               public WC() {}
+               public WC(String w, int c) {
+                       word = w;
+                       count = c;
+               }
+               public String word;
+               public int count;
+               @Override
+               public String toString() {
+                       return word + " " + count;
+               }
+       }
+       
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/926f835a/flink-tests/src/test/java/org/apache/flink/test/exampleScalaPrograms/EnumTriangleOptITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/exampleScalaPrograms/EnumTriangleOptITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/exampleScalaPrograms/EnumTriangleOptITCase.java
index 9701086..4f00b1d 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/exampleScalaPrograms/EnumTriangleOptITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/exampleScalaPrograms/EnumTriangleOptITCase.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.test.exampleScalaPrograms;
 
-import org.apache.flink.examples.scala.graph.EnumTrianglesOpt;
+import org.apache.flink.examples.java.graph.EnumTrianglesOpt;
 import org.apache.flink.test.testdata.EnumTriangleData;
 import org.apache.flink.test.util.JavaProgramTestBase;
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/926f835a/flink-tests/src/test/java/org/apache/flink/test/exampleScalaPrograms/PageRankITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/exampleScalaPrograms/PageRankITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/exampleScalaPrograms/PageRankITCase.java
index d14c2e1..42c6a8e 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/exampleScalaPrograms/PageRankITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/exampleScalaPrograms/PageRankITCase.java
@@ -18,19 +18,19 @@
 
 package org.apache.flink.test.exampleScalaPrograms;
 
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.LinkedList;
+
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.examples.scala.graph.PageRankBasic;
+import org.apache.flink.examples.java.graph.PageRankBasic;
 import org.apache.flink.test.testdata.PageRankData;
 import org.apache.flink.test.util.JavaProgramTestBase;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 import org.junit.runners.Parameterized.Parameters;
 
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.util.Collection;
-import java.util.LinkedList;
-
 @RunWith(Parameterized.class)
 public class PageRankITCase extends JavaProgramTestBase {
        

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/926f835a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithAdjacencyListComparator.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithAdjacencyListComparator.java
 
b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithAdjacencyListComparator.java
index ce31ea2..e607ba7 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithAdjacencyListComparator.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithAdjacencyListComparator.java
@@ -32,8 +32,6 @@ public final class VertexWithAdjacencyListComparator extends 
TypeComparator<Vert
        
        private long reference;
 
-       private Comparable[] extractedKey = new Comparable[1];
-
        private TypeComparator[] comparators = new TypeComparator[]{new 
LongComparator(true)};
 
        @Override
@@ -136,13 +134,13 @@ public final class VertexWithAdjacencyListComparator 
extends TypeComparator<Vert
        }
 
        @Override
-       public Object[] extractKeys(VertexWithAdjacencyList record) {
-               extractedKey[0] = record.getVertexID();
-               return extractedKey;
+       public int extractKeys(Object record, Object[] target, int index) {
+               target[index] = ((VertexWithAdjacencyList) 
record).getVertexID();
+               return 1;
        }
 
        @Override
-       public TypeComparator[] getComparators() {
+       public TypeComparator[] getFlatComparators() {
                return comparators;
        }
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/926f835a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithRankAndDanglingComparator.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithRankAndDanglingComparator.java
 
b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithRankAndDanglingComparator.java
index b9503ae..dd9d4dd 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithRankAndDanglingComparator.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithRankAndDanglingComparator.java
@@ -32,8 +32,6 @@ public final class VertexWithRankAndDanglingComparator 
extends TypeComparator<Ve
        
        private long reference;
 
-       private Comparable[] extractedKey = new Comparable[1];
-
        private TypeComparator[] comparators = new TypeComparator[]{new 
LongComparator(true)};
 
        @Override
@@ -141,13 +139,13 @@ public final class VertexWithRankAndDanglingComparator 
extends TypeComparator<Ve
        }
 
        @Override
-       public Object[] extractKeys(VertexWithRankAndDangling record) {
-               extractedKey[0] = record.getVertexID();
-               return extractedKey;
+       public int extractKeys(Object record, Object[] target, int index) {
+               target[index] = ((VertexWithRankAndDangling) 
record).getVertexID();
+               return 1;
        }
 
        @Override
-       public TypeComparator[] getComparators() {
+       public TypeComparator[] getFlatComparators() {
                return comparators;
        }
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/926f835a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithRankComparator.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithRankComparator.java
 
b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithRankComparator.java
index cfe4a9f..b63f724 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithRankComparator.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithRankComparator.java
@@ -32,8 +32,6 @@ public final class VertexWithRankComparator extends 
TypeComparator<VertexWithRan
        
        private long reference;
 
-       private Comparable[] extractedKey = new Comparable[1];
-
        private TypeComparator[] comparators = new TypeComparator[]{new 
LongComparator(true)};
 
        @Override
@@ -139,13 +137,13 @@ public final class VertexWithRankComparator extends 
TypeComparator<VertexWithRan
        }
 
        @Override
-       public Object[] extractKeys(VertexWithRank record) {
-               extractedKey[0] = record.getVertexID();
-               return extractedKey;
+       public int extractKeys(Object record, Object[] target, int index) {
+               target[index] = ((VertexWithRank) record).getVertexID();
+               return 1;
        }
 
        @Override
-       public TypeComparator[] getComparators() {
+       public TypeComparator[] getFlatComparators() {
                return comparators;
        }
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/926f835a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CoGroupITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CoGroupITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CoGroupITCase.java
index ce9aa65..f0e89df 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CoGroupITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CoGroupITCase.java
@@ -26,18 +26,22 @@ import java.util.LinkedList;
 import java.util.List;
 
 import org.apache.flink.api.common.functions.CoGroupFunction;
+import org.apache.flink.api.common.functions.RichCoGroupFunction;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.common.functions.RichCoGroupFunction;
+import org.apache.flink.api.java.tuple.Tuple1;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.api.java.tuple.Tuple5;
+import org.apache.flink.api.java.tuple.Tuple7;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
 import 
org.apache.flink.test.javaApiOperators.util.CollectionDataSets.CustomType;
+import org.apache.flink.test.javaApiOperators.util.CollectionDataSets.POJO;
 import org.apache.flink.test.util.JavaProgramTestBase;
 import org.apache.flink.util.Collector;
+import org.junit.Assert;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 import org.junit.runners.Parameterized.Parameters;
@@ -45,7 +49,7 @@ import org.junit.runners.Parameterized.Parameters;
 @RunWith(Parameterized.class)
 public class CoGroupITCase extends JavaProgramTestBase {
        
-       private static int NUM_PROGRAMS = 9;
+       private static int NUM_PROGRAMS = 13;
        
        private int curProgId = config.getInteger("ProgramId", -1);
        private String resultPath;
@@ -356,6 +360,149 @@ public class CoGroupITCase extends JavaProgramTestBase {
                                                "5,3,HIJ\n" +
                                                "5,3,IJK\n";
                        }
+                       case 10: {
+                               /*
+                                * CoGroup on two custom type inputs using 
expression keys
+                                */
+                               
+                               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+                               
+                               DataSet<CustomType> ds = 
CollectionDataSets.getCustomTypeDataSet(env);
+                               DataSet<CustomType> ds2 = 
CollectionDataSets.getCustomTypeDataSet(env);
+                               DataSet<CustomType> coGroupDs = 
ds.coGroup(ds2).where("myInt").equalTo("myInt").with(new CustomTypeCoGroup());
+                               
+                               coGroupDs.writeAsText(resultPath);
+                               env.execute();
+                               
+                               // return expected result
+                               return "1,0,test\n" +
+                                               "2,6,test\n" +
+                                               "3,24,test\n" +
+                                               "4,60,test\n" +
+                                               "5,120,test\n" +
+                                               "6,210,test\n";
+                       }
+                       case 11: {
+                               /*
+                                * CoGroup on two custom type inputs using 
expression keys
+                                */
+                               
+                               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+                               
+                               DataSet<POJO> ds = 
CollectionDataSets.getSmallPojoDataSet(env);
+                               DataSet<Tuple7<Integer, String, Integer, 
Integer, Long, String, Long>> ds2 = 
CollectionDataSets.getSmallTuplebasedPojoMatchingDataSet(env);
+                               DataSet<CustomType> coGroupDs = ds.coGroup(ds2)
+                                               
.where("nestedPojo.longNumber").equalTo(6).with(new CoGroupFunction<POJO, 
Tuple7<Integer, String, Integer, Integer, Long, String, Long>, CustomType>() {
+                                               private static final long 
serialVersionUID = 1L;
+
+                                               @Override
+                                               public void coGroup(
+                                                               Iterable<POJO> 
first,
+                                                               
Iterable<Tuple7<Integer, String, Integer, Integer, Long, String, Long>> second,
+                                                               
Collector<CustomType> out) throws Exception {
+                                                       for(POJO p : first) {
+                                                               
for(Tuple7<Integer, String, Integer, Integer, Long, String, Long> t: second) {
+                                                                       
Assert.assertTrue(p.nestedPojo.longNumber == t.f6);
+                                                                       
out.collect(new CustomType(-1, p.nestedPojo.longNumber, "Flink"));
+                                                               }
+                                                       }
+                                               }
+                               });
+                               coGroupDs.writeAsText(resultPath);
+                               env.execute();
+                               
+                               // return expected result
+                               return  "-1,20000,Flink\n" +
+                                               "-1,10000,Flink\n" +
+                                               "-1,30000,Flink\n";
+                       }
+                       case 12: {
+                               /*
+                                * CoGroup field-selector (expression keys) + 
key selector function
+                                * The key selector is unnecessary complicated 
(Tuple1) ;)
+                                */
+                               
+                               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+                               
+                               DataSet<POJO> ds = 
CollectionDataSets.getSmallPojoDataSet(env);
+                               DataSet<Tuple7<Integer, String, Integer, 
Integer, Long, String, Long>> ds2 = 
CollectionDataSets.getSmallTuplebasedPojoMatchingDataSet(env);
+                               DataSet<CustomType> coGroupDs = ds.coGroup(ds2)
+                                               .where(new KeySelector<POJO, 
Tuple1<Long>>() {
+                                                       private static final 
long serialVersionUID = 1L;
+
+                                                       @Override
+                                                       public Tuple1<Long> 
getKey(POJO value)
+                                                                       throws 
Exception {
+                                                               return new 
Tuple1<Long>(value.nestedPojo.longNumber);
+                                                       }
+                                               }).equalTo(6).with(new 
CoGroupFunction<POJO, Tuple7<Integer, String, Integer, Integer, Long, String, 
Long>, CustomType>() {
+                                                       private static final 
long serialVersionUID = 1L;
+
+                                               @Override
+                                               public void coGroup(
+                                                               Iterable<POJO> 
first,
+                                                               
Iterable<Tuple7<Integer, String, Integer, Integer, Long, String, Long>> second,
+                                                               
Collector<CustomType> out) throws Exception {
+                                                       for(POJO p : first) {
+                                                               
for(Tuple7<Integer, String, Integer, Integer, Long, String, Long> t: second) {
+                                                                       
Assert.assertTrue(p.nestedPojo.longNumber == t.f6);
+                                                                       
out.collect(new CustomType(-1, p.nestedPojo.longNumber, "Flink"));
+                                                               }
+                                                       }
+                                               }
+                               });
+                               coGroupDs.writeAsText(resultPath);
+                               env.execute();
+                               
+                               // return expected result
+                               return  "-1,20000,Flink\n" +
+                                               "-1,10000,Flink\n" +
+                                               "-1,30000,Flink\n";
+                       }
+                       case 13: {
+                               /*
+                                * CoGroup field-selector (expression keys) + 
key selector function
+                                * The key selector is simple here
+                                */
+                               
+                               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+                               
+                               DataSet<POJO> ds = 
CollectionDataSets.getSmallPojoDataSet(env);
+                               DataSet<Tuple7<Integer, String, Integer, 
Integer, Long, String, Long>> ds2 = 
CollectionDataSets.getSmallTuplebasedPojoMatchingDataSet(env);
+                               DataSet<CustomType> coGroupDs = ds.coGroup(ds2)
+                                               .where(new KeySelector<POJO, 
Long>() {
+                                                       private static final 
long serialVersionUID = 1L;
+
+                                                       @Override
+                                                       public Long getKey(POJO 
value)
+                                                                       throws 
Exception {
+                                                               return 
value.nestedPojo.longNumber;
+                                                       }
+                                               }).equalTo(6).with(new 
CoGroupFunction<POJO, Tuple7<Integer, String, Integer, Integer, Long, String, 
Long>, CustomType>() {
+                                                       private static final 
long serialVersionUID = 1L;
+
+                                               @Override
+                                               public void coGroup(
+                                                               Iterable<POJO> 
first,
+                                                               
Iterable<Tuple7<Integer, String, Integer, Integer, Long, String, Long>> second,
+                                                               
Collector<CustomType> out) throws Exception {
+                                                       for(POJO p : first) {
+                                                               
for(Tuple7<Integer, String, Integer, Integer, Long, String, Long> t: second) {
+                                                                       
Assert.assertTrue(p.nestedPojo.longNumber == t.f6);
+                                                                       
out.collect(new CustomType(-1, p.nestedPojo.longNumber, "Flink"));
+                                                               }
+                                                       }
+                                               }
+                               });
+                               coGroupDs.writeAsText(resultPath);
+                               env.execute();
+                               
+                               // return expected result
+                               return  "-1,20000,Flink\n" +
+                                               "-1,10000,Flink\n" +
+                                               "-1,30000,Flink\n";
+                       }
+                       
                        default: 
                                throw new IllegalArgumentException("Invalid 
program id");
                        }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/926f835a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupReduceITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupReduceITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupReduceITCase.java
index 00b68fc..fce56d1 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupReduceITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupReduceITCase.java
@@ -35,7 +35,12 @@ import org.apache.flink.api.java.tuple.Tuple5;
 import org.apache.flink.compiler.PactCompiler;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
+import 
org.apache.flink.test.javaApiOperators.util.CollectionDataSets.CrazyNested;
 import 
org.apache.flink.test.javaApiOperators.util.CollectionDataSets.CustomType;
+import 
org.apache.flink.test.javaApiOperators.util.CollectionDataSets.FromTuple;
+import 
org.apache.flink.test.javaApiOperators.util.CollectionDataSets.FromTupleWithCTor;
+import org.apache.flink.test.javaApiOperators.util.CollectionDataSets.POJO;
+import 
org.apache.flink.test.javaApiOperators.util.CollectionDataSets.PojoContainingTupleAndWritable;
 import org.apache.flink.test.util.JavaProgramTestBase;
 import org.apache.flink.util.Collector;
 import org.junit.runner.RunWith;
@@ -48,7 +53,7 @@ import org.apache.flink.api.java.ExecutionEnvironment;
 @RunWith(Parameterized.class)
 public class GroupReduceITCase extends JavaProgramTestBase {
        
-       private static int NUM_PROGRAMS = 15;
+       private static int NUM_PROGRAMS = 19;
        
        private int curProgId = config.getInteger("ProgramId", -1);
        private String resultPath;
@@ -406,7 +411,6 @@ public class GroupReduceITCase extends JavaProgramTestBase {
                                /*
                                 * check correctness of groupReduce with 
descending group sort
                                 */
-
                                        final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
                                        env.setDegreeOfParallelism(1);
 
@@ -484,6 +488,117 @@ public class GroupReduceITCase extends 
JavaProgramTestBase {
                                                        "16,6,Comment#10\n";
                                        
                                }
+                               case 16: {
+                                       /*
+                                        * Deep nesting test
+                                        * + null value in pojo
+                                        */
+                                       final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+                                       
+                                       DataSet<CrazyNested> ds = 
CollectionDataSets.getCrazyNestedDataSet(env);
+                                       DataSet<Tuple2<String, Integer>> 
reduceDs = ds.groupBy("nest_Lvl1.nest_Lvl2.nest_Lvl3.nest_Lvl4.f1nal")
+                                                       .reduceGroup(new 
GroupReduceFunction<CollectionDataSets.CrazyNested, Tuple2<String, Integer>>() {
+                                                               private static 
final long serialVersionUID = 1L;
+
+                                                               @Override
+                                                               public void 
reduce(Iterable<CrazyNested> values,
+                                                                               
Collector<Tuple2<String, Integer>> out)
+                                                                               
throws Exception {
+                                                                       int c = 
0; String n = null;
+                                                                       
for(CrazyNested v : values) {
+                                                                               
c++; // haha
+                                                                               
n = v.nest_Lvl1.nest_Lvl2.nest_Lvl3.nest_Lvl4.f1nal;
+                                                                       }
+                                                                       
out.collect(new Tuple2<String, Integer>(n,c));
+                                                               }});
+                                       
+                                       reduceDs.writeAsCsv(resultPath);
+                                       env.execute();
+                                       
+                                       // return expected result
+                                       return "aa,1\nbb,2\ncc,3\n";
+                               } 
+                               case 17: {
+                                       /*
+                                        * Test Pojo extending from tuple WITH 
custom fields
+                                        */
+                                       final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+                                       
+                                       DataSet<FromTupleWithCTor> ds = 
CollectionDataSets.getPojoExtendingFromTuple(env);
+                                       DataSet<Integer> reduceDs = 
ds.groupBy("special", "f2")
+                                                       .reduceGroup(new 
GroupReduceFunction<FromTupleWithCTor, Integer>() {
+                                                               private static 
final long serialVersionUID = 1L;
+                                                               @Override
+                                                               public void 
reduce(Iterable<FromTupleWithCTor> values,
+                                                                               
Collector<Integer> out)
+                                                                               
throws Exception {
+                                                                       int c = 
0;
+                                                                       
for(FromTuple v : values) {
+                                                                               
c++;
+                                                                       }
+                                                                       
out.collect(c);
+                                                               }});
+                                       
+                                       reduceDs.writeAsText(resultPath);
+                                       env.execute();
+                                       
+                                       // return expected result
+                                       return "3\n2\n";
+                               } 
+                               case 18: {
+                                       /*
+                                        * Test Pojo containing a Writable and 
Tuples
+                                        */
+                                       final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+                                       
+                                       DataSet<PojoContainingTupleAndWritable> 
ds = CollectionDataSets.getPojoContainingTupleAndWritable(env);
+                                       DataSet<Integer> reduceDs = 
ds.groupBy("hadoopFan", "theTuple.*") // full tuple selection
+                                                       .reduceGroup(new 
GroupReduceFunction<PojoContainingTupleAndWritable, Integer>() {
+                                                               private static 
final long serialVersionUID = 1L;
+                                                               @Override
+                                                               public void 
reduce(Iterable<PojoContainingTupleAndWritable> values,
+                                                                               
Collector<Integer> out)
+                                                                               
throws Exception {
+                                                                       int c = 
0;
+                                                                       
for(PojoContainingTupleAndWritable v : values) {
+                                                                               
c++;
+                                                                       }
+                                                                       
out.collect(c);
+                                                               }});
+                                       
+                                       reduceDs.writeAsText(resultPath);
+                                       env.execute();
+                                       
+                                       // return expected result
+                                       return "1\n5\n";
+                               } 
+                               case 19: {
+                                       /*
+                                        * Test Tuple containing pojos and 
regular fields
+                                        */
+                                       final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+                                       
+                                       DataSet<Tuple3<Integer,CrazyNested, 
POJO>> ds = CollectionDataSets.getTupleContainingPojos(env);
+                                       DataSet<Integer> reduceDs = 
ds.groupBy("f0", "f1.*") // nested full tuple selection
+                                                       .reduceGroup(new 
GroupReduceFunction<Tuple3<Integer,CrazyNested, POJO>, Integer>() {
+                                                               private static 
final long serialVersionUID = 1L;
+                                                               @Override
+                                                               public void 
reduce(Iterable<Tuple3<Integer,CrazyNested, POJO>> values,
+                                                                               
Collector<Integer> out)
+                                                                               
throws Exception {
+                                                                       int c = 
0;
+                                                                       
for(Tuple3<Integer,CrazyNested, POJO> v : values) {
+                                                                               
c++;
+                                                                       }
+                                                                       
out.collect(c);
+                                                               }});
+                                       
+                                       reduceDs.writeAsText(resultPath);
+                                       env.execute();
+                                       
+                                       // return expected result
+                                       return "3\n1\n";
+                               } 
                                default: {
                                        throw new 
IllegalArgumentException("Invalid program id");
                                }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/926f835a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/JoinITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/JoinITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/JoinITCase.java
index 433a076..e8d8be9 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/JoinITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/JoinITCase.java
@@ -31,9 +31,11 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.api.java.tuple.Tuple5;
 import org.apache.flink.api.java.tuple.Tuple6;
+import org.apache.flink.api.java.tuple.Tuple7;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
 import 
org.apache.flink.test.javaApiOperators.util.CollectionDataSets.CustomType;
+import org.apache.flink.test.javaApiOperators.util.CollectionDataSets.POJO;
 import org.apache.flink.test.util.JavaProgramTestBase;
 import org.apache.flink.util.Collector;
 import org.junit.runner.RunWith;
@@ -46,7 +48,7 @@ import org.apache.flink.api.java.ExecutionEnvironment;
 @RunWith(Parameterized.class)
 public class JoinITCase extends JavaProgramTestBase {
        
-       private static int NUM_PROGRAMS = 14;
+       private static int NUM_PROGRAMS = 21;
        
        private int curProgId = config.getInteger("ProgramId", -1);
        private String resultPath;
@@ -493,8 +495,175 @@ public class JoinITCase extends JavaProgramTestBase {
                                                "I am fine.,HIJ\n" +
                                                "I am fine.,IJK\n";
                        }
+                       /**
+                        *  Joins with POJOs
+                        */
+                       case 15: {
+                               /*
+                                * Join nested pojo against tuple (selected 
using a string)
+                                */
+                               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+                               
+                               DataSet<POJO> ds1 = 
CollectionDataSets.getSmallPojoDataSet(env);
+                               DataSet<Tuple7<Integer, String, Integer, 
Integer, Long, String, Long>> ds2 = 
CollectionDataSets.getSmallTuplebasedPojoMatchingDataSet(env);
+                               DataSet<Tuple2<POJO, Tuple7<Integer, String, 
Integer, Integer, Long, String, Long> >> joinDs = 
+                                               
ds1.join(ds2).where("nestedPojo.longNumber").equalTo("f6");
+                               
+                               joinDs.writeAsCsv(resultPath);
+                               env.execute();
+                               
+                               // return expected result
+                               return "1 First (10,100,1000,One) 
10000,(1,First,10,100,1000,One,10000)\n" +
+                                          "2 Second (20,200,2000,Two) 
20000,(2,Second,20,200,2000,Two,20000)\n" +
+                                          "3 Third (30,300,3000,Three) 
30000,(3,Third,30,300,3000,Three,30000)\n";
+                       }
+                       
+                       case 16: {
+                               /*
+                                * Join nested pojo against tuple (selected as 
an integer)
+                                */
+                               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+                               
+                               DataSet<POJO> ds1 = 
CollectionDataSets.getSmallPojoDataSet(env);
+                               DataSet<Tuple7<Integer, String, Integer, 
Integer, Long, String, Long>> ds2 = 
CollectionDataSets.getSmallTuplebasedPojoMatchingDataSet(env);
+                               DataSet<Tuple2<POJO, Tuple7<Integer, String, 
Integer, Integer, Long, String, Long> >> joinDs = 
+                                               
ds1.join(ds2).where("nestedPojo.longNumber").equalTo(6); // <--- difference!
+                               
+                               joinDs.writeAsCsv(resultPath);
+                               env.execute();
+                               
+                               // return expected result
+                               return "1 First (10,100,1000,One) 
10000,(1,First,10,100,1000,One,10000)\n" +
+                                          "2 Second (20,200,2000,Two) 
20000,(2,Second,20,200,2000,Two,20000)\n" +
+                                          "3 Third (30,300,3000,Three) 
30000,(3,Third,30,300,3000,Three,30000)\n";
+                       }
+                       case 17: {
+                               /*
+                                * selecting multiple fields using expression 
language
+                                */
+                               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+                               
+                               DataSet<POJO> ds1 = 
CollectionDataSets.getSmallPojoDataSet(env);
+                               DataSet<Tuple7<Integer, String, Integer, 
Integer, Long, String, Long>> ds2 = 
CollectionDataSets.getSmallTuplebasedPojoMatchingDataSet(env);
+                               DataSet<Tuple2<POJO, Tuple7<Integer, String, 
Integer, Integer, Long, String, Long> >> joinDs = 
+                                               
ds1.join(ds2).where("nestedPojo.longNumber", "number", 
"str").equalTo("f6","f0","f1");
+                               
+                               joinDs.writeAsCsv(resultPath);
+                               env.setDegreeOfParallelism(1);
+                               env.execute();
+                               
+                               // return expected result
+                               return "1 First (10,100,1000,One) 
10000,(1,First,10,100,1000,One,10000)\n" +
+                                          "2 Second (20,200,2000,Two) 
20000,(2,Second,20,200,2000,Two,20000)\n" +
+                                          "3 Third (30,300,3000,Three) 
30000,(3,Third,30,300,3000,Three,30000)\n";
+                               
+                       }
+                       case 18: {
+                               /*
+                                * nested into tuple
+                                */
+                               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+                               
+                               DataSet<POJO> ds1 = 
CollectionDataSets.getSmallPojoDataSet(env);
+                               DataSet<Tuple7<Integer, String, Integer, 
Integer, Long, String, Long>> ds2 = 
CollectionDataSets.getSmallTuplebasedPojoMatchingDataSet(env);
+                               DataSet<Tuple2<POJO, Tuple7<Integer, String, 
Integer, Integer, Long, String, Long> >> joinDs = 
+                                               
ds1.join(ds2).where("nestedPojo.longNumber", 
"number","nestedTupleWithCustom.f0").equalTo("f6","f0","f2");
+                               
+                               joinDs.writeAsCsv(resultPath);
+                               env.setDegreeOfParallelism(1);
+                               env.execute();
+                               
+                               // return expected result
+                               return "1 First (10,100,1000,One) 
10000,(1,First,10,100,1000,One,10000)\n" +
+                                          "2 Second (20,200,2000,Two) 
20000,(2,Second,20,200,2000,Two,20000)\n" +
+                                          "3 Third (30,300,3000,Three) 
30000,(3,Third,30,300,3000,Three,30000)\n";
+                               
+                       }
+                       case 19: {
+                               /*
+                                * nested into tuple into pojo
+                                */
+                               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+                               
+                               DataSet<POJO> ds1 = 
CollectionDataSets.getSmallPojoDataSet(env);
+                               DataSet<Tuple7<Integer, String, Integer, 
Integer, Long, String, Long>> ds2 = 
CollectionDataSets.getSmallTuplebasedPojoMatchingDataSet(env);
+                               DataSet<Tuple2<POJO, Tuple7<Integer, String, 
Integer, Integer, Long, String, Long> >> joinDs = 
+                                               
ds1.join(ds2).where("nestedTupleWithCustom.f0","nestedTupleWithCustom.f1.myInt","nestedTupleWithCustom.f1.myLong").equalTo("f2","f3","f4");
+                               
+                               joinDs.writeAsCsv(resultPath);
+                               env.setDegreeOfParallelism(1);
+                               env.execute();
+                               
+                               // return expected result
+                               return "1 First (10,100,1000,One) 
10000,(1,First,10,100,1000,One,10000)\n" +
+                                          "2 Second (20,200,2000,Two) 
20000,(2,Second,20,200,2000,Two,20000)\n" +
+                                          "3 Third (30,300,3000,Three) 
30000,(3,Third,30,300,3000,Three,30000)\n";
+                               
+                       }
+                       case 20: {
+                               /*
+                                * Non-POJO test to verify that full-tuple keys 
are working.
+                                */
+                               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+                               
+                               DataSet<Tuple2<Tuple2<Integer, Integer>, 
String>> ds1 = CollectionDataSets.getSmallNestedTupleDataSet(env);
+                               DataSet<Tuple2<Tuple2<Integer, Integer>, 
String>> ds2 = CollectionDataSets.getSmallNestedTupleDataSet(env);
+                               DataSet<Tuple2<Tuple2<Tuple2<Integer, Integer>, 
String>, Tuple2<Tuple2<Integer, Integer>, String> >> joinDs = 
+                                               
ds1.join(ds2).where(0).equalTo("f0.f0", "f0.f1"); // key is now Tuple2<Integer, 
Integer>
+                               
+                               joinDs.writeAsCsv(resultPath);
+                               env.setDegreeOfParallelism(1);
+                               env.execute();
+                               
+                               // return expected result
+                               return "((1,1),one),((1,1),one)\n" +
+                                          "((2,2),two),((2,2),two)\n" +
+                                          "((3,3),three),((3,3),three)\n";
+                               
+                       }
+                       case 21: {
+                               /*
+                                * Non-POJO test to verify "nested" 
tuple-element selection.
+                                */
+                               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+                               
+                               DataSet<Tuple2<Tuple2<Integer, Integer>, 
String>> ds1 = CollectionDataSets.getSmallNestedTupleDataSet(env);
+                               DataSet<Tuple2<Tuple2<Integer, Integer>, 
String>> ds2 = CollectionDataSets.getSmallNestedTupleDataSet(env);
+                               DataSet<Tuple2<Tuple2<Tuple2<Integer, Integer>, 
String>, Tuple2<Tuple2<Integer, Integer>, String> >> joinDs = 
+                                               
ds1.join(ds2).where("f0.f0").equalTo("f0.f0"); // key is now Integer from 
Tuple2<Integer, Integer>
+                               
+                               joinDs.writeAsCsv(resultPath);
+                               env.setDegreeOfParallelism(1);
+                               env.execute();
+                               
+                               // return expected result
+                               return "((1,1),one),((1,1),one)\n" +
+                                          "((2,2),two),((2,2),two)\n" +
+                                          "((3,3),three),((3,3),three)\n";
+                               
+                       }
+                       case 22: {
+                               /*
+                                * full pojo with full tuple
+                                */
+                               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+                               
+                               DataSet<POJO> ds1 = 
CollectionDataSets.getSmallPojoDataSet(env);
+                               DataSet<Tuple7<Integer, String, Integer, 
Integer, Long, String, Long>> ds2 = 
CollectionDataSets.getSmallTuplebasedPojoMatchingDataSet(env);
+                               DataSet<Tuple2<POJO, Tuple7<Integer, String, 
Integer, Integer, Long, String, Long> >> joinDs = 
+                                               
ds1.join(ds2).where("*").equalTo("*");
+                               
+                               joinDs.writeAsCsv(resultPath);
+                               env.setDegreeOfParallelism(1);
+                               env.execute();
+                               
+                               // return expected result
+                               return "1 First (10,100,1000,One) 
10000,(1,First,10,100,1000,One,10000)\n" +
+                                          "2 Second (20,200,2000,Two) 
20000,(2,Second,20,200,2000,Two,20000)\n" +
+                                          "3 Third (30,300,3000,Three) 
30000,(3,Third,30,300,3000,Three,30000)\n";
+                       }
                        default: 
-                               throw new IllegalArgumentException("Invalid 
program id");
+                               throw new IllegalArgumentException("Invalid 
program id: "+progId);
                        }
                        
                }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/926f835a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/PartitionITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/PartitionITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/PartitionITCase.java
index afefef9..bf1d404 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/PartitionITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/PartitionITCase.java
@@ -45,7 +45,7 @@ import org.junit.runners.Parameterized.Parameters;
 @RunWith(Parameterized.class)
 public class PartitionITCase extends JavaProgramTestBase {
        
-       private static int NUM_PROGRAMS = 1;
+       private static int NUM_PROGRAMS = 3;
        
        private int curProgId = config.getInteger("ProgramId", -1);
        private String resultPath;
@@ -111,7 +111,7 @@ public class PartitionITCase extends JavaProgramTestBase {
                                                "5\n" +
                                                "6\n";
                        }
-                       case 2: {
+                       case 1: {
                                /*
                                 * Test hash partition by key selector
                                 */
@@ -141,7 +141,7 @@ public class PartitionITCase extends JavaProgramTestBase {
                                                "5\n" +
                                                "6\n";
                        }
-                       case 1: {
+                       case 2: {
                                /*
                                 * Test forced rebalancing
                                 */
@@ -200,7 +200,7 @@ public class PartitionITCase extends JavaProgramTestBase {
                                // return expected result
                                return result.toString();
                        }
-                       case 4: {
+                       case 3: {
                                /*
                                 * Test hash partition by key field and 
different DOP
                                 */

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/926f835a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/ReduceITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/ReduceITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/ReduceITCase.java
index 10ea882..a1957f9 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/ReduceITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/ReduceITCase.java
@@ -42,7 +42,7 @@ import org.junit.runners.Parameterized.Parameters;
 @RunWith(Parameterized.class)
 public class ReduceITCase extends JavaProgramTestBase {
        
-       private static int NUM_PROGRAMS = 9;
+       private static int NUM_PROGRAMS = 10;
        
        private int curProgId = config.getInteger("ProgramId", -1);
        private String resultPath;
@@ -305,6 +305,33 @@ public class ReduceITCase extends JavaProgramTestBase {
                                                "5,29,0,P-),2\n" +
                                                "5,25,0,P-),3\n";
                        }
+                       case 10: {
+                               /*
+                                * Case 2 with String-based field expression
+                                */
+                               
+                               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+                               
+                               DataSet<Tuple5<Integer, Long, Integer, String, 
Long>> ds = CollectionDataSets.get5TupleDataSet(env);
+                               DataSet<Tuple5<Integer, Long, Integer, String, 
Long>> reduceDs = ds.
+                                               groupBy("f4","f0").reduce(new 
Tuple5Reduce());
+                               
+                               reduceDs.writeAsCsv(resultPath);
+                               env.execute();
+                               
+                               // return expected result
+                               return "1,1,0,Hallo,1\n" +
+                                               "2,3,2,Hallo Welt wie,1\n" +
+                                               "2,2,1,Hallo Welt,2\n" +
+                                               "3,9,0,P-),2\n" +
+                                               "3,6,5,BCD,3\n" +
+                                               "4,17,0,P-),1\n" +
+                                               "4,17,0,P-),2\n" +
+                                               "5,11,10,GHI,1\n" +
+                                               "5,29,0,P-),2\n" +
+                                               "5,25,0,P-),3\n";
+                       } 
+                       
                        default:
                                throw new IllegalArgumentException("Invalid 
program id");
                        }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/926f835a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/util/CollectionDataSets.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/util/CollectionDataSets.java
 
b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/util/CollectionDataSets.java
index 3ca8f31..b657545 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/util/CollectionDataSets.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/util/CollectionDataSets.java
@@ -24,11 +24,14 @@ import java.util.Collections;
 import java.util.List;
 
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.api.java.tuple.Tuple5;
+import org.apache.flink.api.java.tuple.Tuple7;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.hadoop.io.IntWritable;
 
 /**
  * 
#######################################################################################################
@@ -136,6 +139,22 @@ public class CollectionDataSets {
                return env.fromCollection(data, type);
        }
        
+       public static DataSet<Tuple2<Tuple2<Integer, Integer>, String>> 
getSmallNestedTupleDataSet(ExecutionEnvironment env) {
+               
+               List<Tuple2<Tuple2<Integer, Integer>, String>> data = new 
ArrayList<Tuple2<Tuple2<Integer, Integer>, String>>();
+               data.add(new Tuple2<Tuple2<Integer,Integer>, String>(new 
Tuple2<Integer, Integer>(1,1), "one"));
+               data.add(new Tuple2<Tuple2<Integer,Integer>, String>(new 
Tuple2<Integer, Integer>(2,2), "two"));
+               data.add(new Tuple2<Tuple2<Integer,Integer>, String>(new 
Tuple2<Integer, Integer>(3,3), "three"));
+               
+               TupleTypeInfo<Tuple2<Tuple2<Integer, Integer>, String>> type = 
new 
+                               TupleTypeInfo<Tuple2<Tuple2<Integer, Integer>, 
String>>(
+                                               new 
TupleTypeInfo<Tuple2<Integer, 
Integer>>(BasicTypeInfo.INT_TYPE_INFO,BasicTypeInfo.INT_TYPE_INFO),
+                                               BasicTypeInfo.STRING_TYPE_INFO
+                               );
+               
+               return env.fromCollection(data, type);
+       }
+       
        public static DataSet<String> getStringDataSet(ExecutionEnvironment 
env) {
                
                List<String> data = new ArrayList<String>();
@@ -241,7 +260,150 @@ public class CollectionDataSets {
                public String toString() {
                        return myInt+","+myLong+","+myString;
                }
-               
+       }
+       
+       public static DataSet<Tuple7<Integer, String, Integer, Integer, Long, 
String, Long>> getSmallTuplebasedPojoMatchingDataSet(ExecutionEnvironment env) {
+               List<Tuple7<Integer, String, Integer, Integer, Long, String, 
Long>> data = new ArrayList<Tuple7<Integer, String, Integer, Integer, Long, 
String, Long>>();
+               data.add(new Tuple7<Integer, String, Integer, Integer, Long, 
String, Long>(1, "First",10, 100, 1000L, "One", 10000L));
+               data.add(new Tuple7<Integer, String, Integer, Integer, Long, 
String, Long>(2, "Second",20, 200, 2000L, "Two", 20000L));
+               data.add(new Tuple7<Integer, String, Integer, Integer, Long, 
String, Long>(3, "Third",30, 300, 3000L, "Three", 30000L));
+               return env.fromCollection(data);
+       }
+       
+       public static DataSet<POJO> getSmallPojoDataSet(ExecutionEnvironment 
env) {
+               List<POJO> data = new ArrayList<POJO>();
+               data.add(new POJO(1, "First",10, 100, 1000L, "One", 10000L));
+               data.add(new POJO(2, "Second",20, 200, 2000L, "Two", 20000L));
+               data.add(new POJO(3, "Third",30, 300, 3000L, "Three", 30000L));
+               return env.fromCollection(data);
+       }
+       
+       public static class POJO {
+               public int number;
+               public String str;
+               public Tuple2<Integer, CustomType> nestedTupleWithCustom;
+               public NestedPojo nestedPojo;
+               public transient Long ignoreMe;
+               public POJO(int i0, String s0, 
+                                               int i1, int i2, long l0, String 
s1,
+                                               long l1) {
+                       this.number = i0;
+                       this.str = s0;
+                       this.nestedTupleWithCustom = new Tuple2<Integer, 
CustomType>(i1, new CustomType(i2, l0, s1));
+                       this.nestedPojo = new NestedPojo();
+                       this.nestedPojo.longNumber = l1;
+               }
+               public POJO() {}
+               @Override
+               public String toString() {
+                       return number+" "+str+" "+nestedTupleWithCustom+" 
"+nestedPojo.longNumber;
+               }
+       }
+       
+       public static class NestedPojo {
+               public static Object ignoreMe;
+               public long longNumber;
+               public NestedPojo() {}
+       }
+       
+       public static DataSet<CrazyNested> 
getCrazyNestedDataSet(ExecutionEnvironment env) {
+               List<CrazyNested> data = new ArrayList<CrazyNested>();
+               data.add(new CrazyNested("aa"));
+               data.add(new CrazyNested("bb"));
+               data.add(new CrazyNested("bb"));
+               data.add(new CrazyNested("cc"));
+               data.add(new CrazyNested("cc"));
+               data.add(new CrazyNested("cc"));
+               return env.fromCollection(data);
+       }
+       
+       public static class CrazyNested {
+               public CrazyNestedL1 nest_Lvl1;
+               public Long something; // test proper null-value handling
+               public CrazyNested() {}
+               public CrazyNested(String set, String second, long s) { // 
additional CTor to set all fields to non-null values
+                       this(set);
+                       something = s;
+                       nest_Lvl1.a = second;
+               }
+               public CrazyNested(String set) {
+                       nest_Lvl1 = new CrazyNestedL1();
+                       nest_Lvl1.nest_Lvl2 = new CrazyNestedL2();
+                       nest_Lvl1.nest_Lvl2.nest_Lvl3 = new CrazyNestedL3();
+                       nest_Lvl1.nest_Lvl2.nest_Lvl3.nest_Lvl4 = new 
CrazyNestedL4();
+                       nest_Lvl1.nest_Lvl2.nest_Lvl3.nest_Lvl4.f1nal = set;
+               }
+       }
+       public static class CrazyNestedL1 {
+               public String a;
+               public int b;
+               public CrazyNestedL2 nest_Lvl2;
+       }
+       public static class CrazyNestedL2 {
+               public CrazyNestedL3 nest_Lvl3;
+       }
+       public static class CrazyNestedL3 {
+               public CrazyNestedL4 nest_Lvl4;
+       }
+       public static class CrazyNestedL4 {
+               public String f1nal;
+       }
+       
+       // Copied from TypeExtractorTest
+       public static class FromTuple extends Tuple3<String, String, Long> {
+               private static final long serialVersionUID = 1L;
+               public int special;
+       }
+       
+       public static class FromTupleWithCTor extends FromTuple {
+               public FromTupleWithCTor() {}
+               public FromTupleWithCTor(int special, long tupleField ) {
+                       this.special = special;
+                       this.setField(tupleField, 2);
+               }
+       }
+       public static DataSet<FromTupleWithCTor> 
getPojoExtendingFromTuple(ExecutionEnvironment env) {
+               List<FromTupleWithCTor> data = new 
ArrayList<FromTupleWithCTor>();
+               data.add(new FromTupleWithCTor(1, 10L)); // 3x
+               data.add(new FromTupleWithCTor(1, 10L));
+               data.add(new FromTupleWithCTor(1, 10L));
+               data.add(new FromTupleWithCTor(2, 20L)); // 2x
+               data.add(new FromTupleWithCTor(2, 20L));
+               return env.fromCollection(data);
+       }
+       
+       public static class PojoContainingTupleAndWritable {
+               public int someInt;
+               public String someString;
+               public IntWritable hadoopFan;
+               public Tuple2<Long, Long> theTuple;
+               public PojoContainingTupleAndWritable() {}
+               public PojoContainingTupleAndWritable(int i, long l1, long l2) {
+                       hadoopFan = new IntWritable(i);
+                       someInt = i;
+                       theTuple = new Tuple2<Long, Long>(l1, l2);
+               }
+       }
+       
+       public static DataSet<PojoContainingTupleAndWritable> 
getPojoContainingTupleAndWritable(ExecutionEnvironment env) {
+               List<PojoContainingTupleAndWritable> data = new 
ArrayList<PojoContainingTupleAndWritable>();
+               data.add(new PojoContainingTupleAndWritable(1, 10L, 100L)); // 
1x
+               data.add(new PojoContainingTupleAndWritable(2, 20L, 200L)); // 
5x
+               data.add(new PojoContainingTupleAndWritable(2, 20L, 200L));
+               data.add(new PojoContainingTupleAndWritable(2, 20L, 200L));
+               data.add(new PojoContainingTupleAndWritable(2, 20L, 200L));
+               data.add(new PojoContainingTupleAndWritable(2, 20L, 200L));
+               return env.fromCollection(data);
+       }
+       
+       public static DataSet<Tuple3<Integer,CrazyNested, POJO>> 
getTupleContainingPojos(ExecutionEnvironment env) {
+               List<Tuple3<Integer,CrazyNested, POJO>> data = new 
ArrayList<Tuple3<Integer,CrazyNested, POJO>>();
+               data.add(new Tuple3<Integer,CrazyNested, POJO>(1, new 
CrazyNested("one", "uno", 1L), new POJO(1, "First",10, 100, 1000L, "One", 
10000L) )); // 3x
+               data.add(new Tuple3<Integer,CrazyNested, POJO>(1, new 
CrazyNested("one", "uno", 1L), new POJO(1, "First",10, 100, 1000L, "One", 
10000L) ));
+               data.add(new Tuple3<Integer,CrazyNested, POJO>(1, new 
CrazyNested("one", "uno", 1L), new POJO(1, "First",10, 100, 1000L, "One", 
10000L) ));
+               // POJO is not initialized according to the first two fields.
+               data.add(new Tuple3<Integer,CrazyNested, POJO>(2, new 
CrazyNested("two", "duo", 2L), new POJO(1, "First",10, 100, 1000L, "One", 
10000L) )); // 1x
+               return env.fromCollection(data);
        }
        
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/926f835a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/kmeans/udfs/ComputeDistance.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/kmeans/udfs/ComputeDistance.java
 
b/flink-tests/src/test/java/org/apache/flink/test/recordJobs/kmeans/udfs/ComputeDistance.java
index 6dedcc1..16267f6 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/kmeans/udfs/ComputeDistance.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/recordJobs/kmeans/udfs/ComputeDistance.java
@@ -25,7 +25,6 @@ import 
org.apache.flink.api.java.record.functions.FunctionAnnotation.ConstantFie
 import org.apache.flink.types.DoubleValue;
 import org.apache.flink.types.IntValue;
 import org.apache.flink.types.Record;
-import org.apache.flink.util.Collector;
 
 /**
  * Cross PACT computes the distance of all data points to all cluster

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/926f835a/flink-tests/src/test/java/org/apache/flink/test/util/testjar/KMeansForTest.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/util/testjar/KMeansForTest.java
 
b/flink-tests/src/test/java/org/apache/flink/test/util/testjar/KMeansForTest.java
index 1925a94..0821af5 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/util/testjar/KMeansForTest.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/util/testjar/KMeansForTest.java
@@ -24,17 +24,20 @@ import java.util.Collection;
 
 import org.apache.flink.api.common.Plan;
 import org.apache.flink.api.common.Program;
-import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.common.functions.RichMapFunction;
 import org.apache.flink.api.common.functions.RichReduceFunction;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.configuration.Configuration;
-
+import org.apache.flink.test.localDistributed.PackagedProgramEndToEndITCase;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.operators.IterativeDataSet;
 
+/**
+ * This class belongs to the @see {@link PackagedProgramEndToEndITCase} test
+ *
+ */
 @SuppressWarnings("serial")
 public class KMeansForTest implements Program {
 
@@ -80,12 +83,8 @@ public class KMeansForTest implements Program {
                        .map(new SelectNearestCenter()).withBroadcastSet(loop, 
"centroids")
                        // count and sum point coordinates for each centroid
                        .map(new CountAppender())
-                       .groupBy(new KeySelector<DummyTuple3IntPointLong, 
Integer>() {
-                               @Override
-                               public Integer getKey(DummyTuple3IntPointLong 
value) throws Exception {
-                                       return value.f0;
-                               }
-                       }).reduce(new CentroidAccumulator())
+                       // !test if key expressions are working!
+                       .groupBy("field0").reduce(new CentroidAccumulator())
                        // compute new centroids from point counts and 
coordinate sums
                        .map(new CentroidAverager());
 
@@ -228,16 +227,16 @@ public class KMeansForTest implements Program {
 
        // Use this so that we can check whether POJOs and the POJO comparator 
also work
        public static final class DummyTuple3IntPointLong {
-               public Integer f0;
-               public Point f1;
-               public Long f2;
+               public Integer field0;
+               public Point field1;
+               public Long field2;
 
                public DummyTuple3IntPointLong() {}
 
                DummyTuple3IntPointLong(Integer f0, Point f1, Long f2) {
-                       this.f0 = f0;
-                       this.f1 = f1;
-                       this.f2 = f2;
+                       this.field0 = f0;
+                       this.field1 = f1;
+                       this.field2 = f2;
                }
        }
 
@@ -255,7 +254,7 @@ public class KMeansForTest implements Program {
 
                @Override
                public DummyTuple3IntPointLong reduce(DummyTuple3IntPointLong 
val1, DummyTuple3IntPointLong val2) {
-                       return new DummyTuple3IntPointLong(val1.f0, 
val1.f1.add(val2.f1), val1.f2 + val2.f2);
+                       return new DummyTuple3IntPointLong(val1.field0, 
val1.field1.add(val2.field1), val1.field2 + val2.field2);
                }
        }
 
@@ -264,7 +263,7 @@ public class KMeansForTest implements Program {
 
                @Override
                public Centroid map(DummyTuple3IntPointLong value) {
-                       return new Centroid(value.f0, value.f1.div(value.f2));
+                       return new Centroid(value.field0, 
value.field1.div(value.field2));
                }
        }
 }

Reply via email to