Added expression keys to distinct and partition operator and addressed some of 
the comments


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/aca6fbcd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/aca6fbcd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/aca6fbcd

Branch: refs/heads/master
Commit: aca6fbcd6034cd3d700734f2c50a1084a9c22eb9
Parents: 926f835
Author: Robert Metzger <[email protected]>
Authored: Sun Oct 5 19:30:18 2014 +0200
Committer: Robert Metzger <[email protected]>
Committed: Wed Oct 8 11:39:01 2014 +0200

----------------------------------------------------------------------
 .../java/org/apache/flink/api/java/DataSet.java |  26 +
 .../flink/api/java/SerializationSpeedTest.java  | 150 ------
 .../api/java/operators/DistinctOperator.java    |   5 +-
 .../api/java/operators/PartitionOperator.java   |   5 +-
 .../flink/api/java/typeutils/TypeExtractor.java |   3 +-
 .../type/extractor/PojoTypeExtractionTest.java  | 488 +++++++++++++++++++
 .../java/type/extractor/TypeExtractorTest.java  | 456 +----------------
 .../runtime/FieldAccessMinibenchmark.java       | 150 ++++++
 .../typeutils/runtime/PojoComparatorTest.java   |  64 +++
 .../typeutils/runtime/PojoContainingTuple.java  |  44 ++
 .../runtime/PojoGenericTypeSerializerTest.java  |  32 ++
 .../test/javaApiOperators/DistinctITCase.java   |  47 +-
 .../test/javaApiOperators/PartitionITCase.java  |  41 +-
 .../util/CollectionDataSets.java                |  13 +
 14 files changed, 915 insertions(+), 609 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/aca6fbcd/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java 
b/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
index 424d30b..dd8c4ba 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
@@ -434,6 +434,20 @@ public abstract class DataSet<T> {
        }
        
        /**
+        * Returns a distinct set of a {@link Tuple} {@link DataSet} using 
expression keys.
+        * <p/>
+        * The field position keys specify the fields of Tuples or Pojos on 
which the decision is made if two elements are distinct or
+        * not.
+        * <p/>
+        *
+        * @param fields One or more field positions on which the distinction 
of the DataSet is decided. 
+        * @return A DistinctOperator that represents the distinct DataSet.
+        */
+       public DistinctOperator<T> distinct(String... fields) {
+               return new DistinctOperator<T>(this, new 
Keys.ExpressionKeys<T>(fields, getType()));
+       }
+       
+       /**
         * Returns a distinct set of a {@link Tuple} {@link DataSet} using all 
fields of the tuple.
         * <p/>
         * Note: This operator can only be applied to Tuple DataSets.
@@ -867,6 +881,18 @@ public abstract class DataSet<T> {
        }
        
        /**
+        * Hash-partitions a DataSet on the specified key fields.
+        * <p>
+        * <b>Important:</b>This operation shuffles the whole DataSet over the 
network and can take significant amount of time.
+        * 
+        * @param fields The field expressions on which the DataSet is 
hash-partitioned.
+        * @return The partitioned DataSet.
+        */
+       public PartitionOperator<T> partitionByHash(String... fields) {
+               return new PartitionOperator<T>(this, PartitionMethod.HASH, new 
Keys.ExpressionKeys<T>(fields, getType()));
+       }
+       
+       /**
         * Partitions a DataSet using the specified KeySelector.
         * <p>
         * <b>Important:</b>This operation shuffles the whole DataSet over the 
network and can take significant amount of time.

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/aca6fbcd/flink-java/src/main/java/org/apache/flink/api/java/SerializationSpeedTest.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/SerializationSpeedTest.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/SerializationSpeedTest.java
deleted file mode 100644
index a81f55d..0000000
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/SerializationSpeedTest.java
+++ /dev/null
@@ -1,150 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.java;
-
-import java.lang.management.ManagementFactory;
-import java.lang.management.RuntimeMXBean;
-import java.lang.reflect.Field;
-
-public class SerializationSpeedTest {
-
-       static Field wordDescField;
-       static Field wordField;
-       static {
-               try {
-                       wordDescField = WC.class.getField("wordDesc");
-                       wordField = 
ComplexWordDescriptor.class.getField("word");
-               } catch (Exception e) {
-                       e.printStackTrace();
-               }
-       }
-
-       public static class ComplexWordDescriptor {
-               public String word;
-
-               public String getWord() {
-                       return word;
-               }
-       }
-
-       public static class WC {
-               public int count;
-               public ComplexWordDescriptor wordDesc;
-
-               public WC(int c, String s) throws NoSuchFieldException,
-                               SecurityException {
-                       this.count = c;
-                       this.wordDesc = new ComplexWordDescriptor();
-                       this.wordDesc.word = s;
-               }
-
-               public ComplexWordDescriptor getWordDesc() {
-                       return wordDesc;
-               }
-
-       }
-
-       public static int compareCodeGenPublicFields(WC w1, WC w2) {
-               return w1.wordDesc.word.compareTo(w2.wordDesc.word);
-       }
-
-       public static int compareCodeGenMethods(WC w1, WC w2) {
-               return 
w1.getWordDesc().getWord().compareTo(w2.getWordDesc().getWord());
-       }
-
-       public static int compareReflective(WC w1, WC w2)
-                       throws IllegalArgumentException, IllegalAccessException 
{
-               // get String of w1
-               Object wordDesc1 = wordDescField.get(w1);
-               String word2cmp1 = (String) wordField.get(wordDesc1);
-
-               // get String of w2
-               Object wordDesc2 = wordDescField.get(w2);
-               String word2cmp2 = (String) wordField.get(wordDesc2);
-
-               return word2cmp1.compareTo(word2cmp2);
-       }
-
-       /**
-        * results on Core i7 2600k
-        * 
-        * 
-        * warming up Code gen 5019 Reflection 20364 Factor = 4.057382
-        */
-       public static void main(String[] args) throws NoSuchFieldException,
-                       SecurityException, IllegalArgumentException, 
IllegalAccessException {
-               final long RUNS = 1000000000L;
-
-               final RuntimeMXBean bean = ManagementFactory.getRuntimeMXBean();
-               String jvm = bean.getVmName() + " - " + bean.getVmVendor() + " 
- "
-                               + bean.getSpecVersion() + '/' + 
bean.getVmVersion();
-               System.err.println("Jvm info : " + jvm);
-
-               WC word0 = new WC(14, "Hallo");
-               WC word1 = new WC(3, "Hola");
-
-               System.err.println("warming up");
-               for (long i = 0; i < 100000000; i++) {
-                       compareCodeGenPublicFields(word0, word1);
-                       compareCodeGenMethods(word0, word1);
-                       compareReflective(word0, word1);
-               }
-
-               System.err.println("Code gen public fields");
-               long startTime = System.currentTimeMillis();
-               for (long i = 0; i < RUNS; i++) {
-                       int a = compareCodeGenPublicFields(word0, word1);
-                       if (a == 0) {
-                               System.err.println("hah");
-                       }
-               }
-               long stopTime = System.currentTimeMillis();
-               long elapsedTimeGen = stopTime - startTime;
-               System.err.println(elapsedTimeGen);
-
-               System.err.println("Code gen methods");
-               startTime = System.currentTimeMillis();
-               for (long i = 0; i < RUNS; i++) {
-                       int a = compareCodeGenPublicFields(word0, word1);
-                       if (a == 0) {
-                               System.err.println("hah");
-                       }
-               }
-               stopTime = System.currentTimeMillis();
-               long elapsedTimeGenMethods = stopTime - startTime;
-               System.err.println(elapsedTimeGenMethods);
-
-               System.err.println("Reflection");
-
-               startTime = System.currentTimeMillis();
-               for (long i = 0; i < RUNS; i++) {
-                       int a = compareReflective(word0, word1);
-                       if (a == 0) {
-                               System.err.println("hah");
-                       }
-               }
-               stopTime = System.currentTimeMillis();
-               long elapsedTimeRef = stopTime - startTime;
-               System.err.println(elapsedTimeRef);
-
-               System.err.println("Factor vs public = "
-                               + (elapsedTimeRef / (float) elapsedTimeGen));
-               System.err.println("Factor vs methods = "
-                               + (elapsedTimeRef / (float) 
elapsedTimeGenMethods));
-       }
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/aca6fbcd/flink-java/src/main/java/org/apache/flink/api/java/operators/DistinctOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/DistinctOperator.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/DistinctOperator.java
index 54a65a9..18fd756 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/DistinctOperator.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/DistinctOperator.java
@@ -26,6 +26,7 @@ import 
org.apache.flink.api.common.operators.UnaryOperatorInformation;
 import org.apache.flink.api.common.operators.base.GroupReduceOperatorBase;
 import org.apache.flink.api.common.operators.base.MapOperatorBase;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.CompositeType;
 import org.apache.flink.api.common.functions.RichGroupReduceFunction;
 import 
org.apache.flink.api.common.functions.RichGroupReduceFunction.Combinable;
 import org.apache.flink.api.java.operators.translation.KeyExtractingMapper;
@@ -67,8 +68,8 @@ public class DistinctOperator<T> extends 
SingleInputOperator<T, T, DistinctOpera
                
                
                // FieldPositionKeys can only be applied on Tuples
-               if (keys instanceof Keys.ExpressionKeys && 
!input.getType().isTupleType()) {
-                       throw new InvalidProgramException("Distinction on field 
positions is only possible on tuple data types.");
+               if (keys instanceof Keys.ExpressionKeys && !(input.getType() 
instanceof CompositeType)) {
+                       throw new InvalidProgramException("Distinction on field 
positions is only possible on composite type DataSets.");
                }
                
                this.keys = keys;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/aca6fbcd/flink-java/src/main/java/org/apache/flink/api/java/operators/PartitionOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/PartitionOperator.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/PartitionOperator.java
index 532e464..c4548fb 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/PartitionOperator.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/PartitionOperator.java
@@ -25,6 +25,7 @@ import 
org.apache.flink.api.common.operators.base.MapOperatorBase;
 import org.apache.flink.api.common.operators.base.PartitionOperatorBase;
 import 
org.apache.flink.api.common.operators.base.PartitionOperatorBase.PartitionMethod;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.CompositeType;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.operators.translation.KeyExtractingMapper;
 import org.apache.flink.api.java.operators.translation.KeyRemovingMapper;
@@ -50,8 +51,8 @@ public class PartitionOperator<T> extends 
SingleInputUdfOperator<T, T, Partition
                        throw new UnsupportedOperationException("Range 
Partitioning not yet supported");
                }
                
-               if(pKeys instanceof Keys.ExpressionKeys<?> && 
!input.getType().isTupleType()) {
-                       throw new IllegalArgumentException("Hash Partitioning 
with key fields only possible on Tuple DataSets");
+               if(pKeys instanceof Keys.ExpressionKeys<?> && !(input.getType() 
instanceof CompositeType) ) {
+                       throw new IllegalArgumentException("Hash Partitioning 
with key fields only possible on Composite-type DataSets");
                }
                
                this.pMethod = pMethod;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/aca6fbcd/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
index bc92cdd..6231a74 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
@@ -399,8 +399,7 @@ public class TypeExtractor {
                }
                // objects with generics are treated as raw type
                else if (t instanceof ParameterizedType) {
-                       return privateGetForClass((Class<OUT>) 
((ParameterizedType) t).getRawType(), new ArrayList<Type>()); // pass new type 
hierarchies here because
-                       // while creating the TH here, we assumed a tuple type.
+                       return privateGetForClass((Class<OUT>) 
((ParameterizedType) t).getRawType(), typeHierarchy);
                }
                // no tuple, no TypeVariable, no generic type
                else if (t instanceof Class) {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/aca6fbcd/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/PojoTypeExtractionTest.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/PojoTypeExtractionTest.java
 
b/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/PojoTypeExtractionTest.java
new file mode 100644
index 0000000..34dbd99
--- /dev/null
+++ 
b/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/PojoTypeExtractionTest.java
@@ -0,0 +1,488 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.java.type.extractor;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Date;
+import java.util.List;
+
+import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.CompositeType.FlatFieldDescriptor;
+import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.typeutils.GenericTypeInfo;
+import org.apache.flink.api.java.typeutils.PojoField;
+import org.apache.flink.api.java.typeutils.PojoTypeInfo;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.api.java.typeutils.TypeInfoParserTest.MyWritable;
+import org.apache.flink.api.java.typeutils.WritableTypeInfo;
+import org.junit.Assert;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import com.google.common.collect.HashMultiset;
+
+/**
+ *  Pojo Type tests
+ *  
+ *  A Pojo is a bean-style class with getters, setters and empty ctor
+ *   OR a class with all fields public (or for every private field, there has 
to be a public getter/setter)
+ *   everything else is a generic type (that can't be used for field selection)
+ */
+public class PojoTypeExtractionTest {
+
+       
+       // test with correct pojo types
+       public static class WC { // is a pojo
+               public ComplexNestedClass complex; // is a pojo
+               private int count; // is a BasicType
+
+               public WC() {
+               }
+               public int getCount() {
+                       return count;
+               }
+               public void setCount(int c) {
+                       this.count = c;
+               }
+       }
+       public static class ComplexNestedClass { // pojo
+               public static int ignoreStaticField;
+               public transient int ignoreTransientField;
+               public Date date; // generic type
+               public Integer someNumber; // BasicType
+               public float someFloat; // BasicType
+               public Tuple3<Long, Long, String> word; //Tuple Type with three 
basic types
+               public Object nothing; // generic type
+               public MyWritable hadoopCitizen;  // writableType
+       }
+
+       // all public test
+       public static class AllPublic extends ComplexNestedClass {
+               public ArrayList<String> somethingFancy; // generic type
+               public HashMultiset<Integer> fancyIds; // generic type
+               public String[] fancyArray;                      // generic type
+       }
+       
+       public static class ParentSettingGenerics extends 
PojoWithGenerics<Integer, Long> {
+               public String field3;
+       }
+       public static class PojoWithGenerics<T1, T2> {
+               public int key;
+               public T1 field1;
+               public T2 field2;
+       }
+       
+       public static class ComplexHierarchyTop extends 
ComplexHierarchy<Tuple1<String>> {}
+       public static class ComplexHierarchy<T> extends 
PojoWithGenerics<FromTuple,T> {}
+       
+       // extends from Tuple and adds a field
+       public static class FromTuple extends Tuple3<String, String, Long> {
+               private static final long serialVersionUID = 1L;
+               public int special;
+       }
+       
+       public static class IncorrectPojo {
+               private int isPrivate;
+               public int getIsPrivate() {
+                       return isPrivate;
+               }
+               // setter is missing (intentional)
+       }
+       
+       // correct pojo
+       public static class BeanStylePojo {
+               public String abc;
+               private int field;
+               public int getField() {
+                       return this.field;
+               }
+               public void setField(int f) {
+                       this.field = f;
+               }
+       }
+       public static class WrongCtorPojo {
+               public int a;
+               public WrongCtorPojo(int a) {
+                       this.a = a;
+               }
+       }
+       
+       // in this test, the location of the getters and setters is mixed 
across the type hierarchy.
+       public static class TypedPojoGetterSetterCheck extends 
GenericPojoGetterSetterCheck<String> {
+               public void setPackageProtected(String in) {
+                       this.packageProtected = in;
+               }
+       }
+       public static class GenericPojoGetterSetterCheck<T> {
+               T packageProtected;
+               public T getPackageProtected() {
+                       return packageProtected;
+               }
+       }
+       
+       @Test
+       public void testIncorrectPojos() {
+               TypeInformation<?> typeForClass = 
TypeExtractor.createTypeInfo(IncorrectPojo.class);
+               Assert.assertTrue(typeForClass instanceof GenericTypeInfo<?>);
+               
+               typeForClass = 
TypeExtractor.createTypeInfo(WrongCtorPojo.class);
+               Assert.assertTrue(typeForClass instanceof GenericTypeInfo<?>);
+       }
+       
+       @Test
+       public void testCorrectPojos() {
+               TypeInformation<?> typeForClass = 
TypeExtractor.createTypeInfo(BeanStylePojo.class);
+               Assert.assertTrue(typeForClass instanceof PojoTypeInfo<?>);
+               
+               typeForClass = 
TypeExtractor.createTypeInfo(TypedPojoGetterSetterCheck.class);
+               Assert.assertTrue(typeForClass instanceof PojoTypeInfo<?>);
+       }
+       
+       @Test
+       public void testPojoWC() {
+               TypeInformation<?> typeForClass = 
TypeExtractor.createTypeInfo(WC.class);
+               checkWCPojoAsserts(typeForClass);
+               
+               WC t = new WC();
+               t.complex = new ComplexNestedClass();
+               TypeInformation<?> typeForObject = 
TypeExtractor.getForObject(t);
+               checkWCPojoAsserts(typeForObject);
+       }
+       
+       private void checkWCPojoAsserts(TypeInformation<?> typeInfo) {
+               Assert.assertFalse(typeInfo.isBasicType());
+               Assert.assertFalse(typeInfo.isTupleType());
+               Assert.assertEquals(9, typeInfo.getTotalFields());
+               Assert.assertTrue(typeInfo instanceof PojoTypeInfo);
+               PojoTypeInfo<?> pojoType = (PojoTypeInfo<?>) typeInfo;
+               
+               List<FlatFieldDescriptor> ffd = new 
ArrayList<FlatFieldDescriptor>();
+               String[] fields = {"count","complex.date", 
"complex.hadoopCitizen", "complex.nothing",
+                               "complex.someFloat", "complex.someNumber", 
"complex.word.f0",
+                               "complex.word.f1", "complex.word.f2"};
+               int[] positions = {8,0,1,2,
+                               3,4,5,
+                               6,7};
+               Assert.assertEquals(fields.length, positions.length);
+               for(int i = 0; i < fields.length; i++) {
+                       pojoType.getKey(fields[i], 0, ffd);
+                       Assert.assertEquals("Too many keys returned", 1, 
ffd.size());
+                       Assert.assertEquals("position of field "+fields[i]+" 
wrong", positions[i], ffd.get(0).getPosition());
+                       ffd.clear();
+               }
+               
+               pojoType.getKey("complex.word.*", 0, ffd);
+               Assert.assertEquals(3, ffd.size());
+               // check if it returns 5,6,7
+               for(FlatFieldDescriptor ffdE : ffd) {
+                       final int pos = ffdE.getPosition();
+                       Assert.assertTrue(pos <= 7 );
+                       Assert.assertTrue(5 <= pos );
+                       if(pos == 5) {
+                               Assert.assertEquals(Long.class, 
ffdE.getType().getTypeClass());
+                       }
+                       if(pos == 6) {
+                               Assert.assertEquals(Long.class, 
ffdE.getType().getTypeClass());
+                       }
+                       if(pos == 7) {
+                               Assert.assertEquals(String.class, 
ffdE.getType().getTypeClass());
+                       }
+               }
+               ffd.clear();
+               
+               
+               pojoType.getKey("complex.*", 0, ffd);
+               Assert.assertEquals(8, ffd.size());
+               // check if it returns 0-7
+               for(FlatFieldDescriptor ffdE : ffd) {
+                       final int pos = ffdE.getPosition();
+                       Assert.assertTrue(ffdE.getPosition() <= 7 );
+                       Assert.assertTrue(0 <= ffdE.getPosition() );
+                       if(pos == 0) {
+                               Assert.assertEquals(Date.class, 
ffdE.getType().getTypeClass());
+                       }
+                       if(pos == 1) {
+                               Assert.assertEquals(MyWritable.class, 
ffdE.getType().getTypeClass());
+                       }
+                       if(pos == 2) {
+                               Assert.assertEquals(Object.class, 
ffdE.getType().getTypeClass());
+                       }
+                       if(pos == 3) {
+                               Assert.assertEquals(Float.class, 
ffdE.getType().getTypeClass());
+                       }
+                       if(pos == 4) {
+                               Assert.assertEquals(Integer.class, 
ffdE.getType().getTypeClass());
+                       }
+                       if(pos == 5) {
+                               Assert.assertEquals(Long.class, 
ffdE.getType().getTypeClass());
+                       }
+                       if(pos == 6) {
+                               Assert.assertEquals(Long.class, 
ffdE.getType().getTypeClass());
+                       }
+                       if(pos == 7) {
+                               Assert.assertEquals(String.class, 
ffdE.getType().getTypeClass());
+                       }
+               }
+               ffd.clear();
+               
+               pojoType.getKey("*", 0, ffd);
+               Assert.assertEquals(9, ffd.size());
+               // check if it returns 0-8
+               for(FlatFieldDescriptor ffdE : ffd) {
+                       Assert.assertTrue(ffdE.getPosition() <= 8 );
+                       Assert.assertTrue(0 <= ffdE.getPosition() );
+                       if(ffdE.getPosition() == 8) {
+                               Assert.assertEquals(Integer.class, 
ffdE.getType().getTypeClass());
+                       }
+               }
+               ffd.clear();
+               
+               TypeInformation<?> typeComplexNested = pojoType.getTypeAt(0); 
// ComplexNestedClass complex
+               Assert.assertTrue(typeComplexNested instanceof PojoTypeInfo);
+               
+               Assert.assertEquals(6, typeComplexNested.getArity());
+               Assert.assertEquals(8, typeComplexNested.getTotalFields());
+               PojoTypeInfo<?> pojoTypeComplexNested = (PojoTypeInfo<?>) 
typeComplexNested;
+               
+               boolean dateSeen = false, intSeen = false, floatSeen = false,
+                               tupleSeen = false, objectSeen = false, 
writableSeen = false;
+               for(int i = 0; i < pojoTypeComplexNested.getArity(); i++) {
+                       PojoField field = 
pojoTypeComplexNested.getPojoFieldAt(i);
+                       String name = field.field.getName();
+                       if(name.equals("date")) {
+                               if(dateSeen) {
+                                       Assert.fail("already seen");
+                               }
+                               dateSeen = true;
+                               Assert.assertEquals(new 
GenericTypeInfo<Date>(Date.class), field.type);
+                               Assert.assertEquals(Date.class, 
field.type.getTypeClass());
+                       } else if(name.equals("someNumber")) {
+                               if(intSeen) {
+                                       Assert.fail("already seen");
+                               }
+                               intSeen = true;
+                               
Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, field.type);
+                               Assert.assertEquals(Integer.class, 
field.type.getTypeClass());
+                       } else if(name.equals("someFloat")) {
+                               if(floatSeen) {
+                                       Assert.fail("already seen");
+                               }
+                               floatSeen = true;
+                               
Assert.assertEquals(BasicTypeInfo.FLOAT_TYPE_INFO, field.type);
+                               Assert.assertEquals(Float.class, 
field.type.getTypeClass());
+                       } else if(name.equals("word")) {
+                               if(tupleSeen) {
+                                       Assert.fail("already seen");
+                               }
+                               tupleSeen = true;
+                               Assert.assertTrue(field.type instanceof 
TupleTypeInfo<?>);
+                               Assert.assertEquals(Tuple3.class, 
field.type.getTypeClass());
+                               // do some more advanced checks on the tuple
+                               TupleTypeInfo<?> tupleTypeFromComplexNested = 
(TupleTypeInfo<?>) field.type;
+                               
Assert.assertEquals(BasicTypeInfo.LONG_TYPE_INFO, 
tupleTypeFromComplexNested.getTypeAt(0));
+                               
Assert.assertEquals(BasicTypeInfo.LONG_TYPE_INFO, 
tupleTypeFromComplexNested.getTypeAt(1));
+                               
Assert.assertEquals(BasicTypeInfo.STRING_TYPE_INFO, 
tupleTypeFromComplexNested.getTypeAt(2));
+                       } else if(name.equals("nothing")) {
+                               if(objectSeen) {
+                                       Assert.fail("already seen");
+                               }
+                               objectSeen = true;
+                               Assert.assertEquals(new 
GenericTypeInfo<Object>(Object.class), field.type);
+                               Assert.assertEquals(Object.class, 
field.type.getTypeClass());
+                       } else if(name.equals("hadoopCitizen")) {
+                               if(writableSeen) {
+                                       Assert.fail("already seen");
+                               }
+                               writableSeen = true;
+                               Assert.assertEquals(new 
WritableTypeInfo<MyWritable>(MyWritable.class), field.type);
+                               Assert.assertEquals(MyWritable.class, 
field.type.getTypeClass());
+                       } else {
+                               Assert.fail("field "+field+" is not expected");
+                       }
+               }
+               Assert.assertTrue("Field was not present", dateSeen);
+               Assert.assertTrue("Field was not present", intSeen);
+               Assert.assertTrue("Field was not present", floatSeen);
+               Assert.assertTrue("Field was not present", tupleSeen);
+               Assert.assertTrue("Field was not present", objectSeen);
+               Assert.assertTrue("Field was not present", writableSeen);
+               
+               TypeInformation<?> typeAtOne = pojoType.getTypeAt(1); // int 
count
+               Assert.assertTrue(typeAtOne instanceof BasicTypeInfo);
+               
+               Assert.assertEquals(typeInfo.getTypeClass(), WC.class);
+               Assert.assertEquals(typeInfo.getArity(), 2);
+       }
+       
+       @Test
+       public void testPojoAllPublic() {
+               TypeInformation<?> typeForClass = 
TypeExtractor.createTypeInfo(AllPublic.class);
+               checkAllPublicAsserts(typeForClass);
+               
+               TypeInformation<?> typeForObject = 
TypeExtractor.getForObject(new AllPublic() );
+               checkAllPublicAsserts(typeForObject);
+       }
+       
+       private void checkAllPublicAsserts(TypeInformation<?> typeInformation) {
+               Assert.assertTrue(typeInformation instanceof PojoTypeInfo);
+               Assert.assertEquals(9, typeInformation.getArity());
+               Assert.assertEquals(11, typeInformation.getTotalFields());
+               // check if the three additional fields are identified correctly
+               boolean arrayListSeen = false, multisetSeen = false, 
strArraySeen = false;
+               PojoTypeInfo<?> pojoTypeForClass = (PojoTypeInfo<?>) 
typeInformation;
+               for(int i = 0; i < pojoTypeForClass.getArity(); i++) {
+                       PojoField field = pojoTypeForClass.getPojoFieldAt(i);
+                       String name = field.field.getName();
+                       if(name.equals("somethingFancy")) {
+                               if(arrayListSeen) {
+                                       Assert.fail("already seen");
+                               }
+                               arrayListSeen = true;
+                               Assert.assertTrue(field.type instanceof 
GenericTypeInfo);
+                               Assert.assertEquals(ArrayList.class, 
field.type.getTypeClass());
+                       } else if(name.equals("fancyIds")) {
+                               if(multisetSeen) {
+                                       Assert.fail("already seen");
+                               }
+                               multisetSeen = true;
+                               Assert.assertTrue(field.type instanceof 
GenericTypeInfo);
+                               Assert.assertEquals(HashMultiset.class, 
field.type.getTypeClass());
+                       } else if(name.equals("fancyArray")) {
+                               if(strArraySeen) {
+                                       Assert.fail("already seen");
+                               }
+                               strArraySeen = true;
+                               
Assert.assertEquals(BasicArrayTypeInfo.STRING_ARRAY_TYPE_INFO, field.type);
+                               Assert.assertEquals(String[].class, 
field.type.getTypeClass());
+                       } else if(Arrays.asList("date", "someNumber", 
"someFloat", "word", "nothing", "hadoopCitizen").contains(name)) {
+                               // ignore these, they are inherited from the 
ComplexNestedClass
+                       } 
+                       else {
+                               Assert.fail("field "+field+" is not expected");
+                       }
+               }
+               Assert.assertTrue("Field was not present", arrayListSeen);
+               Assert.assertTrue("Field was not present", multisetSeen);
+               Assert.assertTrue("Field was not present", strArraySeen);
+       }
+       
+       @Test
+       public void testPojoExtendingTuple() {
+               TypeInformation<?> typeForClass = 
TypeExtractor.createTypeInfo(FromTuple.class);
+               checkFromTuplePojo(typeForClass);
+               
+               FromTuple ft = new FromTuple();
+               ft.f0 = ""; ft.f1 = ""; ft.f2 = 0L;
+               TypeInformation<?> typeForObject = 
TypeExtractor.getForObject(ft);
+               checkFromTuplePojo(typeForObject);
+       }
+       
+       private void checkFromTuplePojo(TypeInformation<?> typeInformation) {
+               Assert.assertTrue(typeInformation instanceof PojoTypeInfo<?>);
+               Assert.assertEquals(4, typeInformation.getTotalFields());
+               PojoTypeInfo<?> pojoTypeForClass = (PojoTypeInfo<?>) 
typeInformation;
+               for(int i = 0; i < pojoTypeForClass.getArity(); i++) {
+                       PojoField field = pojoTypeForClass.getPojoFieldAt(i);
+                       String name = field.field.getName();
+                       if(name.equals("special")) {
+                               
Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, field.type);
+                       } else if(name.equals("f0") || name.equals("f1")) {
+                               
Assert.assertEquals(BasicTypeInfo.STRING_TYPE_INFO, field.type);
+                       } else if(name.equals("f2")) {
+                               
Assert.assertEquals(BasicTypeInfo.LONG_TYPE_INFO, field.type);
+                       } else {
+                               Assert.fail("unexpected field");
+                       }
+               }
+       }
+       
+       @Test
+       public void testPojoWithGenerics() {
+               TypeInformation<?> typeForClass = 
TypeExtractor.createTypeInfo(ParentSettingGenerics.class);
+               Assert.assertTrue(typeForClass instanceof PojoTypeInfo<?>);
+               PojoTypeInfo<?> pojoTypeForClass = (PojoTypeInfo<?>) 
typeForClass;
+               for(int i = 0; i < pojoTypeForClass.getArity(); i++) {
+                       PojoField field = pojoTypeForClass.getPojoFieldAt(i);
+                       String name = field.field.getName();
+                       if(name.equals("field1")) {
+                               
Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, field.type);
+                       } else if (name.equals("field2")) {
+                               
Assert.assertEquals(BasicTypeInfo.LONG_TYPE_INFO, field.type);
+                       } else if (name.equals("field3")) {
+                               
Assert.assertEquals(BasicTypeInfo.STRING_TYPE_INFO, field.type);
+                       } else if (name.equals("key")) {
+                               
Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, field.type);
+                       } else {
+                               Assert.fail("Unexpected field "+field);
+                       }
+               }
+       }
+       
+       /**
+        * Test if the TypeExtractor is accepting untyped generics,
+        * making them GenericTypes
+        */
+       @Test
+       @Ignore // kryo needed.
+       public void testPojoWithGenericsSomeFieldsGeneric() {
+               TypeInformation<?> typeForClass = 
TypeExtractor.createTypeInfo(PojoWithGenerics.class);
+               Assert.assertTrue(typeForClass instanceof PojoTypeInfo<?>);
+               PojoTypeInfo<?> pojoTypeForClass = (PojoTypeInfo<?>) 
typeForClass;
+               for(int i = 0; i < pojoTypeForClass.getArity(); i++) {
+                       PojoField field = pojoTypeForClass.getPojoFieldAt(i);
+                       String name = field.field.getName();
+                       if(name.equals("field1")) {
+                               Assert.assertEquals(new 
GenericTypeInfo<Object>(Object.class), field.type);
+                       } else if (name.equals("field2")) {
+                               Assert.assertEquals(new 
GenericTypeInfo<Object>(Object.class), field.type);
+                       } else if (name.equals("key")) {
+                               
Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, field.type);
+                       } else {
+                               Assert.fail("Unexpected field "+field);
+                       }
+               }
+       }
+       
+       
+       @Test
+       public void testPojoWithComplexHierarchy() {
+               TypeInformation<?> typeForClass = 
TypeExtractor.createTypeInfo(ComplexHierarchyTop.class);
+               Assert.assertTrue(typeForClass instanceof PojoTypeInfo<?>);
+               PojoTypeInfo<?> pojoTypeForClass = (PojoTypeInfo<?>) 
typeForClass;
+               for(int i = 0; i < pojoTypeForClass.getArity(); i++) {
+                       PojoField field = pojoTypeForClass.getPojoFieldAt(i);
+                       String name = field.field.getName();
+                       if(name.equals("field1")) {
+                               Assert.assertTrue(field.type instanceof 
PojoTypeInfo<?>); // From tuple is pojo (not tuple type!)
+                       } else if (name.equals("field2")) {
+                               Assert.assertTrue(field.type instanceof 
TupleTypeInfo<?>);
+                               Assert.assertTrue( 
((TupleTypeInfo<?>)field.type).getTypeAt(0).equals(BasicTypeInfo.STRING_TYPE_INFO)
 );
+                       } else if (name.equals("key")) {
+                               
Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, field.type);
+                       } else {
+                               Assert.fail("Unexpected field "+field);
+                       }
+               }
+       }
+       
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/aca6fbcd/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/TypeExtractorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/TypeExtractorTest.java
 
b/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/TypeExtractorTest.java
index 60d41d3..a092a53 100644
--- 
a/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/TypeExtractorTest.java
+++ 
b/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/TypeExtractorTest.java
@@ -22,8 +22,6 @@ import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Date;
 import java.util.List;
 
 import org.apache.flink.api.common.functions.InvalidTypesException;
@@ -46,9 +44,7 @@ import org.apache.flink.api.java.tuple.Tuple1;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.api.java.tuple.Tuple9;
-import org.apache.flink.api.java.typeutils.GenericTypeInfo;
 import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
-import org.apache.flink.api.java.typeutils.PojoField;
 import org.apache.flink.api.java.typeutils.PojoTypeInfo;
 import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
@@ -67,7 +63,6 @@ import org.junit.Assert;
 import org.junit.Ignore;
 import org.junit.Test;
 
-import com.google.common.collect.HashMultiset;
 
 public class TypeExtractorTest {
 
@@ -350,449 +345,7 @@ public class TypeExtractorTest {
                Assert.assertEquals(ti2.getTypeClass(), CustomType.class);
        }
        
-       //
-       // Pojo Type tests
-       // A Pojo is a bean-style class with getters, setters and empty ctor
-       // OR a class with all fields public (or for every private field, there 
has to be a public getter/setter)
-       // everything else is a generic type (that can't be used for field 
selection)
-       //
-       
-       
-       // test with correct pojo types
-       public static class WC { // is a pojo
-               public ComplexNestedClass complex; // is a pojo
-               private int count; // is a BasicType
-
-               public WC() {
-               }
-               public int getCount() {
-                       return count;
-               }
-               public void setCount(int c) {
-                       this.count = c;
-               }
-       }
-       public static class ComplexNestedClass { // pojo
-               public static int ignoreStaticField;
-               public transient int ignoreTransientField;
-               public Date date; // generic type
-               public Integer someNumber; // BasicType
-               public float someFloat; // BasicType
-               public Tuple3<Long, Long, String> word; //Tuple Type with three 
basic types
-               public Object nothing; // generic type
-               public MyWritable hadoopCitizen;  // writableType
-       }
 
-       // all public test
-       public static class AllPublic extends ComplexNestedClass {
-               public ArrayList<String> somethingFancy; // generic type
-               public HashMultiset<Integer> fancyIds; // generic type
-               public String[] fancyArray;                      // generic type
-       }
-       
-       public static class ParentSettingGenerics extends 
PojoWithGenerics<Integer, Long> {
-               public String field3;
-       }
-       public static class PojoWithGenerics<T1, T2> {
-               public int key;
-               public T1 field1;
-               public T2 field2;
-       }
-       
-       public static class ComplexHierarchyTop extends 
ComplexHierarchy<Tuple1<String>> {}
-       public static class ComplexHierarchy<T> extends 
PojoWithGenerics<FromTuple,T> {}
-       
-       // extends from Tuple and adds a field
-       public static class FromTuple extends Tuple3<String, String, Long> {
-               private static final long serialVersionUID = 1L;
-               public int special;
-       }
-       
-       public static class IncorrectPojo {
-               private int isPrivate;
-               public int getIsPrivate() {
-                       return isPrivate;
-               }
-               // setter is missing (intentional)
-       }
-       
-       // correct pojo
-       public static class BeanStylePojo {
-               public String abc;
-               private int field;
-               public int getField() {
-                       return this.field;
-               }
-               public void setField(int f) {
-                       this.field = f;
-               }
-       }
-       public static class WrongCtorPojo {
-               public int a;
-               public WrongCtorPojo(int a) {
-                       this.a = a;
-               }
-       }
-       
-       // in this test, the location of the getters and setters is mixed 
across the type hierarchy.
-       public static class TypedPojoGetterSetterCheck extends 
GenericPojoGetterSetterCheck<String> {
-               public void setPackageProtected(String in) {
-                       this.packageProtected = in;
-               }
-       }
-       public static class GenericPojoGetterSetterCheck<T> {
-               T packageProtected;
-               public T getPackageProtected() {
-                       return packageProtected;
-               }
-       }
-       
-       @Test
-       public void testIncorrectPojos() {
-               TypeInformation<?> typeForClass = 
TypeExtractor.createTypeInfo(IncorrectPojo.class);
-               Assert.assertTrue(typeForClass instanceof GenericTypeInfo<?>);
-               
-               typeForClass = 
TypeExtractor.createTypeInfo(WrongCtorPojo.class);
-               Assert.assertTrue(typeForClass instanceof GenericTypeInfo<?>);
-       }
-       
-       @Test
-       public void testCorrectPojos() {
-               TypeInformation<?> typeForClass = 
TypeExtractor.createTypeInfo(BeanStylePojo.class);
-               Assert.assertTrue(typeForClass instanceof PojoTypeInfo<?>);
-               
-               typeForClass = 
TypeExtractor.createTypeInfo(TypedPojoGetterSetterCheck.class);
-               Assert.assertTrue(typeForClass instanceof PojoTypeInfo<?>);
-       }
-       
-       @Test
-       public void testPojoWC() {
-               TypeInformation<?> typeForClass = 
TypeExtractor.createTypeInfo(WC.class);
-               checkWCPojoAsserts(typeForClass);
-               
-               WC t = new WC();
-               t.complex = new ComplexNestedClass();
-               TypeInformation<?> typeForObject = 
TypeExtractor.getForObject(t);
-               checkWCPojoAsserts(typeForObject);
-       }
-       
-       private void checkWCPojoAsserts(TypeInformation<?> typeInfo) {
-               Assert.assertFalse(typeInfo.isBasicType());
-               Assert.assertFalse(typeInfo.isTupleType());
-               Assert.assertEquals(9, typeInfo.getTotalFields());
-               Assert.assertTrue(typeInfo instanceof PojoTypeInfo);
-               PojoTypeInfo<?> pojoType = (PojoTypeInfo<?>) typeInfo;
-               
-               List<FlatFieldDescriptor> ffd = new 
ArrayList<FlatFieldDescriptor>();
-               String[] fields = {"count","complex.date", 
"complex.hadoopCitizen", "complex.nothing",
-                               "complex.someFloat", "complex.someNumber", 
"complex.word.f0",
-                               "complex.word.f1", "complex.word.f2"};
-               int[] positions = {8,0,1,2,
-                               3,4,5,
-                               6,7};
-               Assert.assertEquals(fields.length, positions.length);
-               for(int i = 0; i < fields.length; i++) {
-                       pojoType.getKey(fields[i], 0, ffd);
-                       Assert.assertEquals("Too many keys returned", 1, 
ffd.size());
-                       Assert.assertEquals("position of field "+fields[i]+" 
wrong", positions[i], ffd.get(0).getPosition());
-                       ffd.clear();
-               }
-               
-               pojoType.getKey("complex.word.*", 0, ffd);
-               Assert.assertEquals(3, ffd.size());
-               // check if it returns 5,6,7
-               for(FlatFieldDescriptor ffdE : ffd) {
-                       final int pos = ffdE.getPosition();
-                       Assert.assertTrue(pos <= 7 );
-                       Assert.assertTrue(5 <= pos );
-                       if(pos == 5) {
-                               Assert.assertEquals(Long.class, 
ffdE.getType().getTypeClass());
-                       }
-                       if(pos == 6) {
-                               Assert.assertEquals(Long.class, 
ffdE.getType().getTypeClass());
-                       }
-                       if(pos == 7) {
-                               Assert.assertEquals(String.class, 
ffdE.getType().getTypeClass());
-                       }
-               }
-               ffd.clear();
-               
-               
-               pojoType.getKey("complex.*", 0, ffd);
-               Assert.assertEquals(8, ffd.size());
-               // check if it returns 0-7
-               for(FlatFieldDescriptor ffdE : ffd) {
-                       final int pos = ffdE.getPosition();
-                       Assert.assertTrue(ffdE.getPosition() <= 7 );
-                       Assert.assertTrue(0 <= ffdE.getPosition() );
-                       if(pos == 0) {
-                               Assert.assertEquals(Date.class, 
ffdE.getType().getTypeClass());
-                       }
-                       if(pos == 1) {
-                               Assert.assertEquals(MyWritable.class, 
ffdE.getType().getTypeClass());
-                       }
-                       if(pos == 2) {
-                               Assert.assertEquals(Object.class, 
ffdE.getType().getTypeClass());
-                       }
-                       if(pos == 3) {
-                               Assert.assertEquals(Float.class, 
ffdE.getType().getTypeClass());
-                       }
-                       if(pos == 4) {
-                               Assert.assertEquals(Integer.class, 
ffdE.getType().getTypeClass());
-                       }
-                       if(pos == 5) {
-                               Assert.assertEquals(Long.class, 
ffdE.getType().getTypeClass());
-                       }
-                       if(pos == 6) {
-                               Assert.assertEquals(Long.class, 
ffdE.getType().getTypeClass());
-                       }
-                       if(pos == 7) {
-                               Assert.assertEquals(String.class, 
ffdE.getType().getTypeClass());
-                       }
-               }
-               ffd.clear();
-               
-               pojoType.getKey("*", 0, ffd);
-               Assert.assertEquals(9, ffd.size());
-               // check if it returns 0-8
-               for(FlatFieldDescriptor ffdE : ffd) {
-                       Assert.assertTrue(ffdE.getPosition() <= 8 );
-                       Assert.assertTrue(0 <= ffdE.getPosition() );
-                       if(ffdE.getPosition() == 8) {
-                               Assert.assertEquals(Integer.class, 
ffdE.getType().getTypeClass());
-                       }
-               }
-               ffd.clear();
-               
-               TypeInformation<?> typeComplexNested = pojoType.getTypeAt(0); 
// ComplexNestedClass complex
-               Assert.assertTrue(typeComplexNested instanceof PojoTypeInfo);
-               
-               Assert.assertEquals(6, typeComplexNested.getArity());
-               Assert.assertEquals(8, typeComplexNested.getTotalFields());
-               PojoTypeInfo<?> pojoTypeComplexNested = (PojoTypeInfo<?>) 
typeComplexNested;
-               
-               boolean dateSeen = false, intSeen = false, floatSeen = false,
-                               tupleSeen = false, objectSeen = false, 
writableSeen = false;
-               for(int i = 0; i < pojoTypeComplexNested.getArity(); i++) {
-                       PojoField field = 
pojoTypeComplexNested.getPojoFieldAt(i);
-                       String name = field.field.getName();
-                       if(name.equals("date")) {
-                               if(dateSeen) {
-                                       Assert.fail("already seen");
-                               }
-                               dateSeen = true;
-                               Assert.assertEquals(new 
GenericTypeInfo<Date>(Date.class), field.type);
-                               Assert.assertEquals(Date.class, 
field.type.getTypeClass());
-                       } else if(name.equals("someNumber")) {
-                               if(intSeen) {
-                                       Assert.fail("already seen");
-                               }
-                               intSeen = true;
-                               
Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, field.type);
-                               Assert.assertEquals(Integer.class, 
field.type.getTypeClass());
-                       } else if(name.equals("someFloat")) {
-                               if(floatSeen) {
-                                       Assert.fail("already seen");
-                               }
-                               floatSeen = true;
-                               
Assert.assertEquals(BasicTypeInfo.FLOAT_TYPE_INFO, field.type);
-                               Assert.assertEquals(Float.class, 
field.type.getTypeClass());
-                       } else if(name.equals("word")) {
-                               if(tupleSeen) {
-                                       Assert.fail("already seen");
-                               }
-                               tupleSeen = true;
-                               Assert.assertTrue(field.type instanceof 
TupleTypeInfo<?>);
-                               Assert.assertEquals(Tuple3.class, 
field.type.getTypeClass());
-                               // do some more advanced checks on the tuple
-                               TupleTypeInfo<?> tupleTypeFromComplexNested = 
(TupleTypeInfo<?>) field.type;
-                               
Assert.assertEquals(BasicTypeInfo.LONG_TYPE_INFO, 
tupleTypeFromComplexNested.getTypeAt(0));
-                               
Assert.assertEquals(BasicTypeInfo.LONG_TYPE_INFO, 
tupleTypeFromComplexNested.getTypeAt(1));
-                               
Assert.assertEquals(BasicTypeInfo.STRING_TYPE_INFO, 
tupleTypeFromComplexNested.getTypeAt(2));
-                       } else if(name.equals("nothing")) {
-                               if(objectSeen) {
-                                       Assert.fail("already seen");
-                               }
-                               objectSeen = true;
-                               Assert.assertEquals(new 
GenericTypeInfo<Object>(Object.class), field.type);
-                               Assert.assertEquals(Object.class, 
field.type.getTypeClass());
-                       } else if(name.equals("hadoopCitizen")) {
-                               if(writableSeen) {
-                                       Assert.fail("already seen");
-                               }
-                               writableSeen = true;
-                               Assert.assertEquals(new 
WritableTypeInfo<MyWritable>(MyWritable.class), field.type);
-                               Assert.assertEquals(MyWritable.class, 
field.type.getTypeClass());
-                       } else {
-                               Assert.fail("field "+field+" is not expected");
-                       }
-               }
-               Assert.assertTrue("Field was not present", dateSeen);
-               Assert.assertTrue("Field was not present", intSeen);
-               Assert.assertTrue("Field was not present", floatSeen);
-               Assert.assertTrue("Field was not present", tupleSeen);
-               Assert.assertTrue("Field was not present", objectSeen);
-               Assert.assertTrue("Field was not present", writableSeen);
-               
-               TypeInformation<?> typeAtOne = pojoType.getTypeAt(1); // int 
count
-               Assert.assertTrue(typeAtOne instanceof BasicTypeInfo);
-               
-               Assert.assertEquals(typeInfo.getTypeClass(), WC.class);
-               Assert.assertEquals(typeInfo.getArity(), 2);
-       }
-       
-       @Test
-       public void testPojoAllPublic() {
-               TypeInformation<?> typeForClass = 
TypeExtractor.createTypeInfo(AllPublic.class);
-               checkAllPublicAsserts(typeForClass);
-               
-               TypeInformation<?> typeForObject = 
TypeExtractor.getForObject(new AllPublic() );
-               checkAllPublicAsserts(typeForObject);
-       }
-       
-       private void checkAllPublicAsserts(TypeInformation<?> typeInformation) {
-               Assert.assertTrue(typeInformation instanceof PojoTypeInfo);
-               Assert.assertEquals(9, typeInformation.getArity());
-               Assert.assertEquals(11, typeInformation.getTotalFields());
-               // check if the three additional fields are identified correctly
-               boolean arrayListSeen = false, multisetSeen = false, 
strArraySeen = false;
-               PojoTypeInfo<?> pojoTypeForClass = (PojoTypeInfo<?>) 
typeInformation;
-               for(int i = 0; i < pojoTypeForClass.getArity(); i++) {
-                       PojoField field = pojoTypeForClass.getPojoFieldAt(i);
-                       String name = field.field.getName();
-                       if(name.equals("somethingFancy")) {
-                               if(arrayListSeen) {
-                                       Assert.fail("already seen");
-                               }
-                               arrayListSeen = true;
-                               Assert.assertTrue(field.type instanceof 
GenericTypeInfo);
-                               Assert.assertEquals(ArrayList.class, 
field.type.getTypeClass());
-                       } else if(name.equals("fancyIds")) {
-                               if(multisetSeen) {
-                                       Assert.fail("already seen");
-                               }
-                               multisetSeen = true;
-                               Assert.assertTrue(field.type instanceof 
GenericTypeInfo);
-                               Assert.assertEquals(HashMultiset.class, 
field.type.getTypeClass());
-                       } else if(name.equals("fancyArray")) {
-                               if(strArraySeen) {
-                                       Assert.fail("already seen");
-                               }
-                               strArraySeen = true;
-                               
Assert.assertEquals(BasicArrayTypeInfo.STRING_ARRAY_TYPE_INFO, field.type);
-                               Assert.assertEquals(String[].class, 
field.type.getTypeClass());
-                       } else if(Arrays.asList("date", "someNumber", 
"someFloat", "word", "nothing", "hadoopCitizen").contains(name)) {
-                               // ignore these, they are inherited from the 
ComplexNestedClass
-                       } 
-                       else {
-                               Assert.fail("field "+field+" is not expected");
-                       }
-               }
-               Assert.assertTrue("Field was not present", arrayListSeen);
-               Assert.assertTrue("Field was not present", multisetSeen);
-               Assert.assertTrue("Field was not present", strArraySeen);
-       }
-       
-       @Test
-       public void testPojoExtendingTuple() {
-               TypeInformation<?> typeForClass = 
TypeExtractor.createTypeInfo(FromTuple.class);
-               checkFromTuplePojo(typeForClass);
-               
-               FromTuple ft = new FromTuple();
-               ft.f0 = ""; ft.f1 = ""; ft.f2 = 0L;
-               TypeInformation<?> typeForObject = 
TypeExtractor.getForObject(ft);
-               checkFromTuplePojo(typeForObject);
-       }
-       
-       private void checkFromTuplePojo(TypeInformation<?> typeInformation) {
-               Assert.assertTrue(typeInformation instanceof PojoTypeInfo<?>);
-               Assert.assertEquals(4, typeInformation.getTotalFields());
-               PojoTypeInfo<?> pojoTypeForClass = (PojoTypeInfo<?>) 
typeInformation;
-               for(int i = 0; i < pojoTypeForClass.getArity(); i++) {
-                       PojoField field = pojoTypeForClass.getPojoFieldAt(i);
-                       String name = field.field.getName();
-                       if(name.equals("special")) {
-                               
Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, field.type);
-                       } else if(name.equals("f0") || name.equals("f1")) {
-                               
Assert.assertEquals(BasicTypeInfo.STRING_TYPE_INFO, field.type);
-                       } else if(name.equals("f2")) {
-                               
Assert.assertEquals(BasicTypeInfo.LONG_TYPE_INFO, field.type);
-                       } else {
-                               Assert.fail("unexpected field");
-                       }
-               }
-       }
-       
-       @Test
-       public void testPojoWithGenerics() {
-               TypeInformation<?> typeForClass = 
TypeExtractor.createTypeInfo(ParentSettingGenerics.class);
-               Assert.assertTrue(typeForClass instanceof PojoTypeInfo<?>);
-               PojoTypeInfo<?> pojoTypeForClass = (PojoTypeInfo<?>) 
typeForClass;
-               for(int i = 0; i < pojoTypeForClass.getArity(); i++) {
-                       PojoField field = pojoTypeForClass.getPojoFieldAt(i);
-                       String name = field.field.getName();
-                       if(name.equals("field1")) {
-                               
Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, field.type);
-                       } else if (name.equals("field2")) {
-                               
Assert.assertEquals(BasicTypeInfo.LONG_TYPE_INFO, field.type);
-                       } else if (name.equals("field3")) {
-                               
Assert.assertEquals(BasicTypeInfo.STRING_TYPE_INFO, field.type);
-                       } else if (name.equals("key")) {
-                               
Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, field.type);
-                       } else {
-                               Assert.fail("Unexpected field "+field);
-                       }
-               }
-       }
-       
-       /**
-        * Test if the TypeExtractor is accepting untyped generics,
-        * making them GenericTypes
-        */
-       @Test
-       @Ignore // kryo needed.
-       public void testPojoWithGenericsSomeFieldsGeneric() {
-               TypeInformation<?> typeForClass = 
TypeExtractor.createTypeInfo(PojoWithGenerics.class);
-               Assert.assertTrue(typeForClass instanceof PojoTypeInfo<?>);
-               PojoTypeInfo<?> pojoTypeForClass = (PojoTypeInfo<?>) 
typeForClass;
-               for(int i = 0; i < pojoTypeForClass.getArity(); i++) {
-                       PojoField field = pojoTypeForClass.getPojoFieldAt(i);
-                       String name = field.field.getName();
-                       if(name.equals("field1")) {
-                               Assert.assertEquals(new 
GenericTypeInfo<Object>(Object.class), field.type);
-                       } else if (name.equals("field2")) {
-                               Assert.assertEquals(new 
GenericTypeInfo<Object>(Object.class), field.type);
-                       } else if (name.equals("key")) {
-                               
Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, field.type);
-                       } else {
-                               Assert.fail("Unexpected field "+field);
-                       }
-               }
-       }
-       
-       
-       @Test
-       public void testPojoWithComplexHierarchy() {
-               TypeInformation<?> typeForClass = 
TypeExtractor.createTypeInfo(ComplexHierarchyTop.class);
-               Assert.assertTrue(typeForClass instanceof PojoTypeInfo<?>);
-               PojoTypeInfo<?> pojoTypeForClass = (PojoTypeInfo<?>) 
typeForClass;
-               for(int i = 0; i < pojoTypeForClass.getArity(); i++) {
-                       PojoField field = pojoTypeForClass.getPojoFieldAt(i);
-                       String name = field.field.getName();
-                       if(name.equals("field1")) {
-                               Assert.assertTrue(field.type instanceof 
PojoTypeInfo<?>); // From tuple is pojo (not tuple type!)
-                       } else if (name.equals("field2")) {
-                               Assert.assertTrue(field.type instanceof 
TupleTypeInfo<?>);
-                               Assert.assertTrue( 
((TupleTypeInfo<?>)field.type).getTypeAt(0).equals(BasicTypeInfo.STRING_TYPE_INFO)
 );
-                       } else if (name.equals("key")) {
-                               
Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, field.type);
-                       } else {
-                               Assert.fail("Unexpected field "+field);
-                       }
-               }
-       }
-       
-       // End of Pojo type tests
        
        public static class CustomType {
                public String myField1;
@@ -1691,20 +1244,21 @@ public class TypeExtractorTest {
                public T myField;
        }
        
+       public static class InType extends MyObject<String> {}
        @SuppressWarnings({ "rawtypes", "unchecked" })
        @Test
        @Ignore
        public void testParamertizedCustomObject() {
-               RichMapFunction<?, ?> function = new 
RichMapFunction<MyObject<String>, MyObject<String>>() {
+               RichMapFunction<?, ?> function = new RichMapFunction<InType, 
MyObject<String>>() {
                        private static final long serialVersionUID = 1L;
 
                        @Override
-                       public MyObject<String> map(MyObject<String> value) 
throws Exception {
+                       public MyObject<String> map(InType value) throws 
Exception {
                                return null;
                        }
                };
-               
-               TypeInformation<?> ti = 
TypeExtractor.getMapReturnTypes(function, (TypeInformation) 
TypeInfoParser.parse("org.apache.flink.api.java.type.extractor.TypeExtractorTest$MyObject"));
+               TypeInformation<?> inType = 
TypeExtractor.createTypeInfo(InType.class);
+               TypeInformation<?> ti = 
TypeExtractor.getMapReturnTypes(function, (TypeInformation) inType);
                Assert.assertTrue(ti instanceof PojoTypeInfo);
        }
        

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/aca6fbcd/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/FieldAccessMinibenchmark.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/FieldAccessMinibenchmark.java
 
b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/FieldAccessMinibenchmark.java
new file mode 100644
index 0000000..8a0ec82
--- /dev/null
+++ 
b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/FieldAccessMinibenchmark.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.java.typeutils.runtime;
+
+import java.lang.management.ManagementFactory;
+import java.lang.management.RuntimeMXBean;
+import java.lang.reflect.Field;
+
+public class FieldAccessMinibenchmark {
+
+       static Field wordDescField;
+       static Field wordField;
+       static {
+               try {
+                       wordDescField = WC.class.getField("wordDesc");
+                       wordField = 
ComplexWordDescriptor.class.getField("word");
+               } catch (Exception e) {
+                       e.printStackTrace();
+               }
+       }
+
+       public static class ComplexWordDescriptor {
+               public String word;
+
+               public String getWord() {
+                       return word;
+               }
+       }
+
+       public static class WC {
+               public int count;
+               public ComplexWordDescriptor wordDesc;
+
+               public WC(int c, String s) throws NoSuchFieldException,
+                               SecurityException {
+                       this.count = c;
+                       this.wordDesc = new ComplexWordDescriptor();
+                       this.wordDesc.word = s;
+               }
+
+               public ComplexWordDescriptor getWordDesc() {
+                       return wordDesc;
+               }
+
+       }
+
+       public static int compareCodeGenPublicFields(WC w1, WC w2) {
+               return w1.wordDesc.word.compareTo(w2.wordDesc.word);
+       }
+
+       public static int compareCodeGenMethods(WC w1, WC w2) {
+               return 
w1.getWordDesc().getWord().compareTo(w2.getWordDesc().getWord());
+       }
+
+       public static int compareReflective(WC w1, WC w2)
+                       throws IllegalArgumentException, IllegalAccessException 
{
+               // get String of w1
+               Object wordDesc1 = wordDescField.get(w1);
+               String word2cmp1 = (String) wordField.get(wordDesc1);
+
+               // get String of w2
+               Object wordDesc2 = wordDescField.get(w2);
+               String word2cmp2 = (String) wordField.get(wordDesc2);
+
+               return word2cmp1.compareTo(word2cmp2);
+       }
+
+       /**
+        * results on Core i7 2600k
+        * 
+        * 
+        * warming up Code gen 5019 Reflection 20364 Factor = 4.057382
+        */
+       public static void main(String[] args) throws NoSuchFieldException,
+                       SecurityException, IllegalArgumentException, 
IllegalAccessException {
+               final long RUNS = 1000000000L;
+
+               final RuntimeMXBean bean = ManagementFactory.getRuntimeMXBean();
+               String jvm = bean.getVmName() + " - " + bean.getVmVendor() + " 
- "
+                               + bean.getSpecVersion() + '/' + 
bean.getVmVersion();
+               System.err.println("Jvm info : " + jvm);
+
+               WC word0 = new WC(14, "Hallo");
+               WC word1 = new WC(3, "Hola");
+
+               System.err.println("warming up");
+               for (long i = 0; i < 100000000; i++) {
+                       compareCodeGenPublicFields(word0, word1);
+                       compareCodeGenMethods(word0, word1);
+                       compareReflective(word0, word1);
+               }
+
+               System.err.println("Code gen public fields");
+               long startTime = System.currentTimeMillis();
+               for (long i = 0; i < RUNS; i++) {
+                       int a = compareCodeGenPublicFields(word0, word1);
+                       if (a == 0) {
+                               System.err.println("hah");
+                       }
+               }
+               long stopTime = System.currentTimeMillis();
+               long elapsedTimeGen = stopTime - startTime;
+               System.err.println(elapsedTimeGen);
+
+               System.err.println("Code gen methods");
+               startTime = System.currentTimeMillis();
+               for (long i = 0; i < RUNS; i++) {
+                       int a = compareCodeGenPublicFields(word0, word1);
+                       if (a == 0) {
+                               System.err.println("hah");
+                       }
+               }
+               stopTime = System.currentTimeMillis();
+               long elapsedTimeGenMethods = stopTime - startTime;
+               System.err.println(elapsedTimeGenMethods);
+
+               System.err.println("Reflection");
+
+               startTime = System.currentTimeMillis();
+               for (long i = 0; i < RUNS; i++) {
+                       int a = compareReflective(word0, word1);
+                       if (a == 0) {
+                               System.err.println("hah");
+                       }
+               }
+               stopTime = System.currentTimeMillis();
+               long elapsedTimeRef = stopTime - startTime;
+               System.err.println(elapsedTimeRef);
+
+               System.err.println("Factor vs public = "
+                               + (elapsedTimeRef / (float) elapsedTimeGen));
+               System.err.println("Factor vs methods = "
+                               + (elapsedTimeRef / (float) 
elapsedTimeGenMethods));
+       }
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/aca6fbcd/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoComparatorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoComparatorTest.java
 
b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoComparatorTest.java
new file mode 100644
index 0000000..5cb31ca
--- /dev/null
+++ 
b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoComparatorTest.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime;
+
+import java.util.Arrays;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.ComparatorTestBase;
+import org.apache.flink.api.common.typeutils.CompositeType;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.operators.Keys.ExpressionKeys;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.junit.Assert;
+import org.junit.Ignore;
+
+
+@Ignore // TODO
+public class PojoComparatorTest extends 
ComparatorTestBase<PojoContainingTuple> {
+       TypeInformation<PojoContainingTuple> type = 
TypeExtractor.getForClass(PojoContainingTuple.class);
+       
+       PojoContainingTuple[] data = new PojoContainingTuple[]{
+               new PojoContainingTuple(1, 1L, 1L),
+               new PojoContainingTuple(2, 2L, 2L),
+               new PojoContainingTuple(8519, 85190L, 85190L),
+               new PojoContainingTuple(-51498, 85191L, 85191L),
+       };
+
+       @Override
+       protected TypeComparator<PojoContainingTuple> createComparator(boolean 
ascending) {
+               Assert.assertTrue(type instanceof CompositeType);
+               CompositeType<PojoContainingTuple> cType = 
(CompositeType<PojoContainingTuple>) type;
+               ExpressionKeys<PojoContainingTuple> keys = new 
ExpressionKeys<PojoContainingTuple>(new String[] {"theTuple.*"}, cType);
+               boolean[] orders = new boolean[keys.getNumberOfKeyFields()];
+               Arrays.fill(orders, true);
+               return 
cType.createComparator(keys.computeLogicalKeyPositions(), orders, 0);
+       }
+
+       @Override
+       protected TypeSerializer<PojoContainingTuple> createSerializer() {
+               return type.createSerializer();
+       }
+
+       @Override
+       protected PojoContainingTuple[] getSortedTestData() {
+               return data;
+       }
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/aca6fbcd/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoContainingTuple.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoContainingTuple.java
 
b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoContainingTuple.java
new file mode 100644
index 0000000..ca17ee0
--- /dev/null
+++ 
b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoContainingTuple.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.java.typeutils.runtime;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+
+/**
+ * This file belongs to the PojoComparatorTest test
+ *
+ */
+public class PojoContainingTuple {
+       public int someInt;
+       public String someString = "abc";
+       public Tuple2<Long, Long> theTuple;
+       public PojoContainingTuple() {}
+       public PojoContainingTuple(int i, long l1, long l2) {
+               someInt = i;
+               theTuple = new Tuple2<Long, Long>(l1, l2);
+       }
+       
+       @Override
+       public boolean equals(Object obj) {
+               if(obj instanceof PojoContainingTuple) {
+                       PojoContainingTuple other = (PojoContainingTuple) obj;
+                       return someInt == other.someInt && 
theTuple.equals(other.theTuple);
+               }
+               return false;
+       }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/aca6fbcd/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoGenericTypeSerializerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoGenericTypeSerializerTest.java
 
b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoGenericTypeSerializerTest.java
new file mode 100644
index 0000000..a176178
--- /dev/null
+++ 
b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoGenericTypeSerializerTest.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+
+public class PojoGenericTypeSerializerTest extends 
AbstractGenericTypeSerializerTest {
+       
+       @Override
+       protected <T> TypeSerializer<T> createSerializer(Class<T> type) {
+               TypeInformation<T> typeInfo = TypeExtractor.getForClass(type);
+               return typeInfo.createSerializer();
+       }
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/aca6fbcd/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/DistinctITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/DistinctITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/DistinctITCase.java
index 69bab75..54f1b58 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/DistinctITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/DistinctITCase.java
@@ -25,6 +25,7 @@ import java.util.LinkedList;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.functions.RichMapFunction;
 import org.apache.flink.api.java.tuple.Tuple1;
 import org.apache.flink.api.java.tuple.Tuple2;
@@ -33,6 +34,7 @@ import org.apache.flink.api.java.tuple.Tuple5;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
 import 
org.apache.flink.test.javaApiOperators.util.CollectionDataSets.CustomType;
+import org.apache.flink.test.javaApiOperators.util.CollectionDataSets.POJO;
 import org.apache.flink.test.util.JavaProgramTestBase;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
@@ -42,7 +44,7 @@ import org.junit.runners.Parameterized.Parameters;
 @RunWith(Parameterized.class)
 public class DistinctITCase extends JavaProgramTestBase {
        
-       private static int NUM_PROGRAMS = 6;
+       private static int NUM_PROGRAMS = 8;
        
        private int curProgId = config.getInteger("ProgramId", -1);
        private String resultPath;
@@ -237,6 +239,49 @@ public class DistinctITCase extends JavaProgramTestBase {
                                                "5,2\n" +
                                                "5,3\n";
                        }
+                       case 7: {
+                               
+                               /*
+                                * check correctness of distinct on tuples with 
field expressions
+                                */
+                               
+                               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+                               
+                               DataSet<Tuple5<Integer, Long,  Integer, String, 
Long>> ds = CollectionDataSets.getSmall5TupleDataSet(env);
+                               DataSet<Tuple1<Integer>> reduceDs = ds.union(ds)
+                                               
.distinct("f0").project(0).types(Integer.class);
+                               
+                               reduceDs.writeAsCsv(resultPath);
+                               env.execute();
+                               
+                               // return expected result
+                               return "1\n" +
+                                               "2\n";
+                                                               
+                       }
+                       case 8: {
+                               
+                               /*
+                                * check correctness of distinct on Pojos
+                                */
+                               
+                               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+                               
+                               DataSet<POJO> ds = 
CollectionDataSets.getDuplicatePojoDataSet(env);
+                               DataSet<Integer> reduceDs = 
ds.distinct("nestedPojo.longNumber").map(new 
MapFunction<CollectionDataSets.POJO, Integer>() {
+                                       @Override
+                                       public Integer map(POJO value) throws 
Exception {
+                                               return (int) 
value.nestedPojo.longNumber;
+                                       }
+                               });
+                               
+                               reduceDs.writeAsText(resultPath);
+                               env.execute();
+                               
+                               // return expected result
+                               return "10000\n20000\n30000\n";
+                                                               
+                       }
                        default: 
                                throw new IllegalArgumentException("Invalid 
program id");
                        }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/aca6fbcd/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/PartitionITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/PartitionITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/PartitionITCase.java
index bf1d404..d3c87fa 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/PartitionITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/PartitionITCase.java
@@ -36,6 +36,7 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
+import org.apache.flink.test.javaApiOperators.util.CollectionDataSets.POJO;
 import org.apache.flink.test.util.JavaProgramTestBase;
 import org.apache.flink.util.Collector;
 import org.junit.runner.RunWith;
@@ -45,7 +46,7 @@ import org.junit.runners.Parameterized.Parameters;
 @RunWith(Parameterized.class)
 public class PartitionITCase extends JavaProgramTestBase {
        
-       private static int NUM_PROGRAMS = 3;
+       private static int NUM_PROGRAMS = 4;
        
        private int curProgId = config.getInteger("ProgramId", -1);
        private String resultPath;
@@ -224,6 +225,29 @@ public class PartitionITCase extends JavaProgramTestBase {
                                                "5\n" +
                                                "6\n";
                        }
+                       case 4: {
+                               /*
+                                * Test hash partition with key expression
+                                */
+               
+                               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+                               env.setDegreeOfParallelism(3);
+                               
+                               DataSet<POJO> ds = 
CollectionDataSets.getDuplicatePojoDataSet(env);
+                               DataSet<Long> uniqLongs = ds
+                                               
.partitionByHash("nestedPojo.longNumber").setParallelism(4)
+                                               .mapPartition(new 
UniqueNestedPojoLongMapper());
+                               uniqLongs.writeAsText(resultPath);
+                               
+                               env.execute();
+                               
+                               // return expected result
+                               return  "10000\n" +
+                                               "20000\n" +
+                                               "30000\n";
+                       }
+                       
+                       
                        
                        default: 
                                throw new IllegalArgumentException("Invalid 
program id");
@@ -246,6 +270,21 @@ public class PartitionITCase extends JavaProgramTestBase {
                }
        }
        
+       public static class UniqueNestedPojoLongMapper implements 
MapPartitionFunction<POJO, Long> {
+               private static final long serialVersionUID = 1L;
+
+               @Override
+               public void mapPartition(Iterable<POJO> records, 
Collector<Long> out) throws Exception {
+                       HashSet<Long> uniq = new HashSet<Long>();
+                       for(POJO t : records) {
+                               uniq.add(t.nestedPojo.longNumber);
+                       }
+                       for(Long l : uniq) {
+                               out.collect(l);
+                       }
+               }
+       }
+       
        public static class PartitionIndexMapper extends RichMapFunction<Long, 
Tuple2<Integer, Integer>> {
                private static final long serialVersionUID = 1L;
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/aca6fbcd/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/util/CollectionDataSets.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/util/CollectionDataSets.java
 
b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/util/CollectionDataSets.java
index b657545..d023287 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/util/CollectionDataSets.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/util/CollectionDataSets.java
@@ -278,6 +278,19 @@ public class CollectionDataSets {
                return env.fromCollection(data);
        }
        
+       public static DataSet<POJO> 
getDuplicatePojoDataSet(ExecutionEnvironment env) {
+               List<POJO> data = new ArrayList<POJO>();
+               data.add(new POJO(1, "First",10, 100, 1000L, "One", 10000L)); 
// 5x
+               data.add(new POJO(1, "First",10, 100, 1000L, "One", 10000L));
+               data.add(new POJO(1, "First",10, 100, 1000L, "One", 10000L));
+               data.add(new POJO(1, "First",10, 100, 1000L, "One", 10000L));
+               data.add(new POJO(1, "First",10, 100, 1000L, "One", 10000L));
+               data.add(new POJO(2, "Second",20, 200, 2000L, "Two", 20000L));
+               data.add(new POJO(3, "Third",30, 300, 3000L, "Three", 30000L)); 
// 2x
+               data.add(new POJO(3, "Third",30, 300, 3000L, "Three", 30000L));
+               return env.fromCollection(data);
+       }
+       
        public static class POJO {
                public int number;
                public String str;

Reply via email to