http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/926f835a/flink-java/src/main/java/org/apache/flink/api/java/operators/CoGroupOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/CoGroupOperator.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/CoGroupOperator.java
index 9ecb9a2..56f90f4 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/CoGroupOperator.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/CoGroupOperator.java
@@ -32,7 +32,8 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.DataSet;
 import 
org.apache.flink.api.java.operators.DeltaIteration.SolutionSetPlaceHolder;
 import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.java.operators.Keys.FieldPositionKeys;
+import org.apache.flink.api.java.operators.Keys.ExpressionKeys;
+import org.apache.flink.api.java.operators.Keys.IncompatibleKeysException;
 import org.apache.flink.api.java.operators.translation.KeyExtractingMapper;
 import 
org.apache.flink.api.java.operators.translation.PlanBothUnwrappingCoGroupOperator;
 import 
org.apache.flink.api.java.operators.translation.PlanLeftUnwrappingCoGroupOperator;
@@ -42,6 +43,7 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
 
+
 /**
  * A {@link DataSet} that is the result of a CoGroup transformation. 
  * 
@@ -74,16 +76,16 @@ public class CoGroupOperator<I1, I2, OUT> extends 
TwoInputUdfOperator<I1, I2, OU
 
                // sanity check solution set key mismatches
                if (input1 instanceof SolutionSetPlaceHolder) {
-                       if (keys1 instanceof FieldPositionKeys) {
-                               int[] positions = ((FieldPositionKeys<?>) 
keys1).computeLogicalKeyPositions();
+                       if (keys1 instanceof ExpressionKeys) {
+                               int[] positions = ((ExpressionKeys<?>) 
keys1).computeLogicalKeyPositions();
                                ((SolutionSetPlaceHolder<?>) 
input1).checkJoinKeyFields(positions);
                        } else {
                                throw new InvalidProgramException("Currently, 
the solution set may only be CoGrouped with using tuple field positions.");
                        }
                }
                if (input2 instanceof SolutionSetPlaceHolder) {
-                       if (keys2 instanceof FieldPositionKeys) {
-                               int[] positions = ((FieldPositionKeys<?>) 
keys2).computeLogicalKeyPositions();
+                       if (keys2 instanceof ExpressionKeys) {
+                               int[] positions = ((ExpressionKeys<?>) 
keys2).computeLogicalKeyPositions();
                                ((SolutionSetPlaceHolder<?>) 
input2).checkJoinKeyFields(positions);
                        } else {
                                throw new InvalidProgramException("Currently, 
the solution set may only be CoGrouped with using tuple field positions.");
@@ -108,10 +110,10 @@ public class CoGroupOperator<I1, I2, OUT> extends 
TwoInputUdfOperator<I1, I2, OU
        protected 
org.apache.flink.api.common.operators.base.CoGroupOperatorBase<?, ?, OUT, ?> 
translateToDataFlow(Operator<I1> input1, Operator<I2> input2) {
                
                String name = getName() != null ? getName() : 
function.getClass().getName();
-
-               if (!keys1.areCompatibale(keys2)) {
-                       throw new InvalidProgramException("The types of the key 
fields do not match. Left:" +
-                                       " " + keys1 + " Right: " + keys2);
+               try {
+                       keys1.areCompatible(keys2);
+               } catch (IncompatibleKeysException e) {
+                       throw new InvalidProgramException("The types of the key 
fields do not match.", e);
                }
 
                if (keys1 instanceof Keys.SelectorFunctionKeys
@@ -166,15 +168,13 @@ public class CoGroupOperator<I1, I2, OUT> extends 
TwoInputUdfOperator<I1, I2, OU
 
                        return po;
                }
-               else if ((keys1 instanceof Keys.FieldPositionKeys
-                               && keys2 instanceof Keys.FieldPositionKeys) ||
-                               ((keys1 instanceof Keys.ExpressionKeys
-                                               && keys2 instanceof 
Keys.ExpressionKeys)))
+               else if ( keys1 instanceof Keys.ExpressionKeys && keys2 
instanceof Keys.ExpressionKeys)
                        {
-
-                               if (!keys1.areCompatibale(keys2)) {
-                                       throw new InvalidProgramException("The 
types of the key fields do not match.");
-                               }
+                       try {
+                               keys1.areCompatible(keys2);
+                       } catch (IncompatibleKeysException e) {
+                               throw new InvalidProgramException("The types of 
the key fields do not match.", e);
+                       }
 
                        int[] logicalKeyPositions1 = 
keys1.computeLogicalKeyPositions();
                        int[] logicalKeyPositions2 = 
keys2.computeLogicalKeyPositions();
@@ -364,7 +364,7 @@ public class CoGroupOperator<I1, I2, OUT> extends 
TwoInputUdfOperator<I1, I2, OU
                 * @see DataSet
                 */
                public CoGroupOperatorSetsPredicate where(int... fields) {
-                       return new CoGroupOperatorSetsPredicate(new 
Keys.FieldPositionKeys<I1>(fields, input1.getType()));
+                       return new CoGroupOperatorSetsPredicate(new 
Keys.ExpressionKeys<I1>(fields, input1.getType()));
                }
 
                /**
@@ -380,9 +380,9 @@ public class CoGroupOperator<I1, I2, OUT> extends 
TwoInputUdfOperator<I1, I2, OU
                 * @see Tuple
                 * @see DataSet
                 */
-//             public CoGroupOperatorSetsPredicate where(String... fields) {
-//                     return new CoGroupOperatorSetsPredicate(new 
Keys.ExpressionKeys<I1>(fields, input1.getType()));
-//             }
+               public CoGroupOperatorSetsPredicate where(String... fields) {
+                       return new CoGroupOperatorSetsPredicate(new 
Keys.ExpressionKeys<I1>(fields, input1.getType()));
+               }
 
                /**
                 * Continues a CoGroup transformation and defines a {@link 
KeySelector} function for the first co-grouped {@link DataSet}.</br>
@@ -436,7 +436,7 @@ public class CoGroupOperator<I1, I2, OUT> extends 
TwoInputUdfOperator<I1, I2, OU
                         *           Call {@link 
org.apache.flink.api.java.operators.CoGroupOperator.CoGroupOperatorSets.CoGroupOperatorSetsPredicate.CoGroupOperatorWithoutFunction#with(org.apache.flink.api.common.functions.CoGroupFunction)}
 to finalize the CoGroup transformation.
                         */
                        public CoGroupOperatorWithoutFunction equalTo(int... 
fields) {
-                               return createCoGroupOperator(new 
Keys.FieldPositionKeys<I2>(fields, input2.getType()));
+                               return createCoGroupOperator(new 
Keys.ExpressionKeys<I2>(fields, input2.getType()));
                        }
 
                        /**
@@ -448,9 +448,9 @@ public class CoGroupOperator<I1, I2, OUT> extends 
TwoInputUdfOperator<I1, I2, OU
                         * @return An incomplete CoGroup transformation.
                         *           Call {@link 
org.apache.flink.api.java.operators.CoGroupOperator.CoGroupOperatorSets.CoGroupOperatorSetsPredicate.CoGroupOperatorWithoutFunction#with(org.apache.flink.api.common.functions.CoGroupFunction)}
 to finalize the CoGroup transformation.
                         */
-//                     public CoGroupOperatorWithoutFunction equalTo(String... 
fields) {
-//                             return createCoGroupOperator(new 
Keys.ExpressionKeys<I2>(fields, input2.getType()));
-//                     }
+                       public CoGroupOperatorWithoutFunction equalTo(String... 
fields) {
+                               return createCoGroupOperator(new 
Keys.ExpressionKeys<I2>(fields, input2.getType()));
+                       }
 
                        /**
                         * Continues a CoGroup transformation and defines a 
{@link KeySelector} function for the second co-grouped {@link DataSet}.</br>
@@ -480,9 +480,10 @@ public class CoGroupOperator<I1, I2, OUT> extends 
TwoInputUdfOperator<I1, I2, OU
                                if (keys2.isEmpty()) {
                                        throw new InvalidProgramException("The 
co-group keys must not be empty.");
                                }
-
-                               if (!keys1.areCompatibale(keys2)) {
-                                       throw new InvalidProgramException("The 
pair of co-group keys are not compatible with each other.");
+                               try {
+                                       keys1.areCompatible(keys2);
+                               } catch(IncompatibleKeysException ike) {
+                                       throw new InvalidProgramException("The 
pair of co-group keys are not compatible with each other.", ike);
                                }
 
                                return new 
CoGroupOperatorWithoutFunction(keys2);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/926f835a/flink-java/src/main/java/org/apache/flink/api/java/operators/DistinctOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/DistinctOperator.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/DistinctOperator.java
index 18cb8f6..54a65a9 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/DistinctOperator.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/DistinctOperator.java
@@ -58,7 +58,7 @@ public class DistinctOperator<T> extends 
SingleInputOperator<T, T, DistinctOpera
                                for(int i = 0; i < tupleType.getArity(); i++) {
                                        allFields[i] = i;
                                }
-                               keys = new Keys.FieldPositionKeys<T>(allFields, 
input.getType(), true);
+                               keys = new Keys.ExpressionKeys<T>(allFields, 
input.getType(), true);
                        }
                        else {
                                throw new InvalidProgramException("Distinction 
on all fields is only possible on tuple data types.");
@@ -67,7 +67,7 @@ public class DistinctOperator<T> extends 
SingleInputOperator<T, T, DistinctOpera
                
                
                // FieldPositionKeys can only be applied on Tuples
-               if (keys instanceof Keys.FieldPositionKeys && 
!input.getType().isTupleType()) {
+               if (keys instanceof Keys.ExpressionKeys && 
!input.getType().isTupleType()) {
                        throw new InvalidProgramException("Distinction on field 
positions is only possible on tuple data types.");
                }
                
@@ -81,7 +81,7 @@ public class DistinctOperator<T> extends 
SingleInputOperator<T, T, DistinctOpera
 
                String name = function.getClass().getName();
                
-               if (keys instanceof Keys.FieldPositionKeys) {
+               if (keys instanceof Keys.ExpressionKeys) {
 
                        int[] logicalKeyPositions = 
keys.computeLogicalKeyPositions();
                        UnaryOperatorInformation<T, T> operatorInfo = new 
UnaryOperatorInformation<T, T>(getInputType(), getResultType());

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/926f835a/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupReduceOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupReduceOperator.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupReduceOperator.java
index edae3c8..cbcc367 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupReduceOperator.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupReduceOperator.java
@@ -139,7 +139,8 @@ public class GroupReduceOperator<IN, OUT> extends 
SingleInputUdfOperator<IN, OUT
                        
                        return po;
                }
-               else if (grouper.getKeys() instanceof Keys.FieldPositionKeys) {
+               else if (grouper.getKeys() instanceof Keys.ExpressionKeys) { 
//was field position key
+                       //TODO ask stephan
 
                        int[] logicalKeyPositions = 
grouper.getKeys().computeLogicalKeyPositions();
                        UnaryOperatorInformation<IN, OUT> operatorInfo = new 
UnaryOperatorInformation<IN, OUT>(getInputType(), getResultType());
@@ -166,19 +167,19 @@ public class GroupReduceOperator<IN, OUT> extends 
SingleInputUdfOperator<IN, OUT
                        
                        return po;
                }
-               else if (grouper.getKeys() instanceof Keys.ExpressionKeys) {
-
-                       int[] logicalKeyPositions = 
grouper.getKeys().computeLogicalKeyPositions();
-                       UnaryOperatorInformation<IN, OUT> operatorInfo = new 
UnaryOperatorInformation<IN, OUT>(getInputType(), getResultType());
-                       GroupReduceOperatorBase<IN, OUT, 
GroupReduceFunction<IN, OUT>> po =
-                                       new GroupReduceOperatorBase<IN, OUT, 
GroupReduceFunction<IN, OUT>>(function, operatorInfo, logicalKeyPositions, 
name);
-
-                       po.setCombinable(combinable);
-                       po.setInput(input);
-                       po.setDegreeOfParallelism(this.getParallelism());
-                       
-                       return po;
-               }
+//             else if (grouper.getKeys() instanceof Keys.ExpressionKeys) {
+//
+//                     int[] logicalKeyPositions = 
grouper.getKeys().computeLogicalKeyPositions();
+//                     UnaryOperatorInformation<IN, OUT> operatorInfo = new 
UnaryOperatorInformation<IN, OUT>(getInputType(), getResultType());
+//                     GroupReduceOperatorBase<IN, OUT, 
GroupReduceFunction<IN, OUT>> po =
+//                                     new GroupReduceOperatorBase<IN, OUT, 
GroupReduceFunction<IN, OUT>>(function, operatorInfo, logicalKeyPositions, 
name);
+//
+//                     po.setCombinable(combinable);
+//                     po.setInput(input);
+//                     po.setDegreeOfParallelism(this.getParallelism());
+//
+//                     return po;
+//             }
                else {
                        throw new UnsupportedOperationException("Unrecognized 
key type.");
                }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/926f835a/flink-java/src/main/java/org/apache/flink/api/java/operators/JoinOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/JoinOperator.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/JoinOperator.java
index bc35c14..caa27dc 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/JoinOperator.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/JoinOperator.java
@@ -25,6 +25,7 @@ import org.apache.flink.api.common.InvalidProgramException;
 import org.apache.flink.api.common.functions.FlatJoinFunction;
 import org.apache.flink.api.common.functions.JoinFunction;
 import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.RichFlatJoinFunction;
 import org.apache.flink.api.common.operators.BinaryOperatorInformation;
 import org.apache.flink.api.common.operators.DualInputSemanticProperties;
 import org.apache.flink.api.common.operators.Operator;
@@ -33,23 +34,22 @@ import 
org.apache.flink.api.common.operators.base.JoinOperatorBase;
 import org.apache.flink.api.common.operators.base.MapOperatorBase;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.DataSet;
-import 
org.apache.flink.api.java.operators.DeltaIteration.SolutionSetPlaceHolder;
-import org.apache.flink.api.common.functions.RichFlatJoinFunction;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.functions.SemanticPropUtil;
-import org.apache.flink.api.java.operators.Keys.FieldPositionKeys;
+import 
org.apache.flink.api.java.operators.DeltaIteration.SolutionSetPlaceHolder;
+import org.apache.flink.api.java.operators.Keys.ExpressionKeys;
+import org.apache.flink.api.java.operators.Keys.IncompatibleKeysException;
 import org.apache.flink.api.java.operators.translation.KeyExtractingMapper;
 import 
org.apache.flink.api.java.operators.translation.PlanBothUnwrappingJoinOperator;
 import 
org.apache.flink.api.java.operators.translation.PlanLeftUnwrappingJoinOperator;
 import 
org.apache.flink.api.java.operators.translation.PlanRightUnwrappingJoinOperator;
 import org.apache.flink.api.java.operators.translation.WrappingFunction;
-import org.apache.flink.api.java.typeutils.TupleTypeInfo;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
-
 //CHECKSTYLE.OFF: AvoidStarImport - Needed for TupleGenerator
 import org.apache.flink.api.java.tuple.*;
-import org.apache.flink.util.Collector;
 //CHECKSTYLE.ON: AvoidStarImport
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.util.Collector;
 
 /**
  * A {@link DataSet} that is the result of a Join transformation. 
@@ -124,16 +124,16 @@ public abstract class JoinOperator<I1, I2, OUT> extends 
TwoInputUdfOperator<I1,
 
                // sanity check solution set key mismatches
                if (input1 instanceof SolutionSetPlaceHolder) {
-                       if (keys1 instanceof FieldPositionKeys) {
-                               int[] positions = ((FieldPositionKeys<?>) 
keys1).computeLogicalKeyPositions();
+                       if (keys1 instanceof ExpressionKeys) {
+                               int[] positions = ((ExpressionKeys<?>) 
keys1).computeLogicalKeyPositions();
                                ((SolutionSetPlaceHolder<?>) 
input1).checkJoinKeyFields(positions);
                        } else {
                                throw new InvalidProgramException("Currently, 
the solution set may only be joined with using tuple field positions.");
                        }
                }
                if (input2 instanceof SolutionSetPlaceHolder) {
-                       if (keys2 instanceof FieldPositionKeys) {
-                               int[] positions = ((FieldPositionKeys<?>) 
keys2).computeLogicalKeyPositions();
+                       if (keys2 instanceof ExpressionKeys) {
+                               int[] positions = ((ExpressionKeys<?>) 
keys2).computeLogicalKeyPositions();
                                ((SolutionSetPlaceHolder<?>) 
input2).checkJoinKeyFields(positions);
                        } else {
                                throw new InvalidProgramException("Currently, 
the solution set may only be joined with using tuple field positions.");
@@ -247,12 +247,12 @@ public abstract class JoinOperator<I1, I2, OUT> extends 
TwoInputUdfOperator<I1,
                protected JoinOperatorBase<?, ?, OUT, ?> translateToDataFlow(
                                Operator<I1> input1,
                                Operator<I2> input2) {
-                       
-                       String name = getName() != null ? getName() : 
function.getClass().getName();
 
-                       if (!keys1.areCompatibale(keys2)) {
-                               throw new InvalidProgramException("The types of 
the key fields do not match. Left:" +
-                                               " " + keys1 + " Right: " + 
keys2);
+                       String name = getName() != null ? getName() : 
function.getClass().getName();
+                       try {
+                               keys1.areCompatible(super.keys2);
+                       } catch(IncompatibleKeysException ike) {
+                               throw new InvalidProgramException("The types of 
the key fields do not match.", ike);
                        }
 
                        if (keys1 instanceof Keys.SelectorFunctionKeys
@@ -315,10 +315,7 @@ public abstract class JoinOperator<I1, I2, OUT> extends 
TwoInputUdfOperator<I1,
 
                                return po;
                        }
-                       else if ((super.keys1 instanceof Keys.FieldPositionKeys
-                                               && super.keys2 instanceof 
Keys.FieldPositionKeys) ||
-                                       ((super.keys1 instanceof 
Keys.ExpressionKeys
-                                                       && super.keys2 
instanceof Keys.ExpressionKeys)))
+                       else if (super.keys1 instanceof Keys.ExpressionKeys && 
super.keys2 instanceof Keys.ExpressionKeys)
                        {
                                // Neither side needs the tuple 
wrapping/unwrapping
 
@@ -765,7 +762,7 @@ public abstract class JoinOperator<I1, I2, OUT> extends 
TwoInputUdfOperator<I1,
                 * @see DataSet
                 */
                public JoinOperatorSetsPredicate where(int... fields) {
-                       return new JoinOperatorSetsPredicate(new 
Keys.FieldPositionKeys<I1>(fields, input1.getType()));
+                       return new JoinOperatorSetsPredicate(new 
Keys.ExpressionKeys<I1>(fields, input1.getType()));
                }
 
                /**
@@ -782,9 +779,9 @@ public abstract class JoinOperator<I1, I2, OUT> extends 
TwoInputUdfOperator<I1,
                 * @see Tuple
                 * @see DataSet
                 */
-//             public JoinOperatorSetsPredicate where(String... fields) {
-//                     return new JoinOperatorSetsPredicate(new 
Keys.ExpressionKeys<I1>(fields, input1.getType()));
-//             }
+               public JoinOperatorSetsPredicate where(String... fields) {
+                       return new JoinOperatorSetsPredicate(new 
Keys.ExpressionKeys<I1>(fields, input1.getType()));
+               }
                
                /**
                 * Continues a Join transformation and defines a {@link 
KeySelector} function for the first join {@link DataSet}.</br>
@@ -843,7 +840,7 @@ public abstract class JoinOperator<I1, I2, OUT> extends 
TwoInputUdfOperator<I1,
                         * @return A DefaultJoin that represents the joined 
DataSet.
                         */
                        public DefaultJoin<I1, I2> equalTo(int... fields) {
-                               return createJoinOperator(new 
Keys.FieldPositionKeys<I2>(fields, input2.getType()));
+                               return createJoinOperator(new 
Keys.ExpressionKeys<I2>(fields, input2.getType()));
                        }
 
                        /**
@@ -857,9 +854,9 @@ public abstract class JoinOperator<I1, I2, OUT> extends 
TwoInputUdfOperator<I1,
                         * @param fields The fields of the second join DataSet 
that should be used as keys.
                         * @return A DefaultJoin that represents the joined 
DataSet.
                         */
-//                     public DefaultJoin<I1, I2> equalTo(String... fields) {
-//                             return createJoinOperator(new 
Keys.ExpressionKeys<I2>(fields, input2.getType()));
-//                     }
+                       public DefaultJoin<I1, I2> equalTo(String... fields) {
+                               return createJoinOperator(new 
Keys.ExpressionKeys<I2>(fields, input2.getType()));
+                       }
 
                        /**
                         * Continues a Join transformation and defines a {@link 
KeySelector} function for the second join {@link DataSet}.</br>
@@ -887,8 +884,10 @@ public abstract class JoinOperator<I1, I2, OUT> extends 
TwoInputUdfOperator<I1,
                                        throw new InvalidProgramException("The 
join keys may not be empty.");
                                }
                                
-                               if (!keys1.areCompatibale(keys2)) {
-                                       throw new InvalidProgramException("The 
pair of join keys are not compatible with each other.");
+                               try {
+                                       keys1.areCompatible(keys2);
+                               } catch (IncompatibleKeysException e) {
+                                       throw new InvalidProgramException("The 
pair of join keys are not compatible with each other.",e);
                                }
 
                                return new DefaultJoin<I1, I2>(input1, input2, 
keys1, keys2, joinHint);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/926f835a/flink-java/src/main/java/org/apache/flink/api/java/operators/Keys.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/Keys.java 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/Keys.java
index b19d5c3..653a04a 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/Keys.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/Keys.java
@@ -18,116 +18,53 @@
 
 package org.apache.flink.api.java.operators;
 
+import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.LinkedList;
+import java.util.List;
 
 import org.apache.flink.api.common.InvalidProgramException;
+import org.apache.flink.api.common.typeinfo.AtomicType;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.CompositeType;
+import org.apache.flink.api.common.typeutils.CompositeType.FlatFieldDescriptor;
 import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.java.typeutils.PojoTypeInfo;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
 import org.apache.flink.api.java.typeutils.TupleTypeInfoBase;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
+import com.google.common.base.Preconditions;
+import com.google.common.primitives.Ints;
 
-public abstract class Keys<T> {
 
+public abstract class Keys<T> {
+       private static final Logger LOG = LoggerFactory.getLogger(Keys.class);
 
        public abstract int getNumberOfKeyFields();
 
        public boolean isEmpty() {
                return getNumberOfKeyFields() == 0;
        }
-
-       public abstract boolean areCompatibale(Keys<?> other);
-
+       
+       /**
+        * Check if two sets of keys are compatible to each other (matching 
types, key counts)
+        */
+       public abstract boolean areCompatible(Keys<?> other) throws 
IncompatibleKeysException;
+       
        public abstract int[] computeLogicalKeyPositions();
-
+       
+       
        // 
--------------------------------------------------------------------------------------------
-       //  Specializations for field indexed / expression-based / 
extractor-based grouping
+       //  Specializations for expression-based / extractor-based grouping
        // 
--------------------------------------------------------------------------------------------
-
-       public static class FieldPositionKeys<T> extends Keys<T> {
-
-               private final int[] fieldPositions;
-               private final TypeInformation<?>[] types;
-
-               public FieldPositionKeys(int[] groupingFields, 
TypeInformation<T> type) {
-                       this(groupingFields, type, false);
-               }
-
-               public FieldPositionKeys(int[] groupingFields, 
TypeInformation<T> type, boolean allowEmpty) {
-                       if (!type.isTupleType()) {
-                               throw new InvalidProgramException("Specifying 
keys via field positions is only valid" +
-                                               "for tuple data types. Type: " 
+ type);
-                       }
-
-                       if (!allowEmpty && (groupingFields == null || 
groupingFields.length == 0)) {
-                               throw new IllegalArgumentException("The 
grouping fields must not be empty.");
-                       }
-
-                       TupleTypeInfoBase<?> tupleType = 
(TupleTypeInfoBase<?>)type;
-
-                       this.fieldPositions = makeFields(groupingFields, 
(TupleTypeInfoBase<?>) type);
-
-                       types = new TypeInformation[this.fieldPositions.length];
-                       for(int i = 0; i < this.fieldPositions.length; i++) {
-                               types[i] = 
tupleType.getTypeAt(this.fieldPositions[i]);
-                       }
-
-               }
-
-               @Override
-               public int getNumberOfKeyFields() {
-                       return this.fieldPositions.length;
-               }
-
-               @Override
-               public boolean areCompatibale(Keys<?> other) {
-
-                       if (other instanceof FieldPositionKeys) {
-                               FieldPositionKeys<?> oKey = 
(FieldPositionKeys<?>) other;
-
-                               if(oKey.types.length != this.types.length) {
-                                       return false;
-                               }
-                               for(int i=0; i<this.types.length; i++) {
-                                       
if(!this.types[i].equals(oKey.types[i])) {
-                                               return false;
-                                       }
-                               }
-                               return true;
-
-                       } else if (other instanceof SelectorFunctionKeys) {
-                               if(this.types.length != 1) {
-                                       return false;
-                               }
-
-                               SelectorFunctionKeys<?, ?> sfk = 
(SelectorFunctionKeys<?, ?>) other;
-
-                               return sfk.keyType.equals(this.types[0]);
-                       }
-                       else {
-                               return false;
-                       }
-               }
-
-               @Override
-               public int[] computeLogicalKeyPositions() {
-                       return this.fieldPositions;
-               }
-
-               @Override
-               public String toString() {
-                       String fieldsString = Arrays.toString(fieldPositions);
-                       String typesString = Arrays.toString(types);
-                       return "Tuple position key (Fields: " + fieldsString + 
" Types: " + typesString + ")";
-               }
-       }
-
-       // 
--------------------------------------------------------------------------------------------
-
+       
+       
        public static class SelectorFunctionKeys<T, K> extends Keys<T> {
 
                private final KeySelector<T, K> keyExtractor;
                private final TypeInformation<K> keyType;
+               private final int[] logicalKeyFields;
 
                public SelectorFunctionKeys(KeySelector<T, K> keyExtractor, 
TypeInformation<T> inputType, TypeInformation<K> keyType) {
                        if (keyExtractor == null) {
@@ -136,6 +73,15 @@ public abstract class Keys<T> {
 
                        this.keyExtractor = keyExtractor;
                        this.keyType = keyType;
+                       
+                       // we have to handle a special case here:
+                       // if the keyType is a tuple type, we need to select 
the full tuple with all its fields.
+                       if(keyType.isTupleType()) {
+                               ExpressionKeys<K> ek = new 
ExpressionKeys<K>(new String[] {ExpressionKeys.SELECT_ALL_CHAR}, keyType);
+                               logicalKeyFields = 
ek.computeLogicalKeyPositions();
+                       } else {
+                               logicalKeyFields = new int[] {0};
+                       }
 
                        if (!this.keyType.isKeyType()) {
                                throw new IllegalArgumentException("Invalid 
type of KeySelector keys");
@@ -152,35 +98,53 @@ public abstract class Keys<T> {
 
                @Override
                public int getNumberOfKeyFields() {
-                       return 1;
+                       return logicalKeyFields.length;
                }
 
                @Override
-               public boolean areCompatibale(Keys<?> other) {
-
+               public boolean areCompatible(Keys<?> other) throws 
IncompatibleKeysException {
+                       
                        if (other instanceof SelectorFunctionKeys) {
                                @SuppressWarnings("unchecked")
                                SelectorFunctionKeys<?, K> sfk = 
(SelectorFunctionKeys<?, K>) other;
 
                                return sfk.keyType.equals(this.keyType);
                        }
-                       else if (other instanceof FieldPositionKeys) {
-                               FieldPositionKeys<?> fpk = 
(FieldPositionKeys<?>) other;
-
-                               if(fpk.types.length != 1) {
-                                       return false;
+                       else if (other instanceof ExpressionKeys) {
+                               ExpressionKeys<?> expressionKeys = 
(ExpressionKeys<?>) other;
+                               
+                               if(keyType.isTupleType()) {
+                                       // special case again:
+                                       TupleTypeInfo<?> tupleKeyType = 
(TupleTypeInfo<?>) keyType;
+                                       List<FlatFieldDescriptor> keyTypeFields 
= new ArrayList<FlatFieldDescriptor>(tupleKeyType.getTotalFields());
+                                       
tupleKeyType.getKey(ExpressionKeys.SELECT_ALL_CHAR, 0, keyTypeFields);
+                                       if(expressionKeys.keyFields.size() != 
keyTypeFields.size()) {
+                                               throw new 
IncompatibleKeysException(IncompatibleKeysException.SIZE_MISMATCH_MESSAGE);
+                                       }
+                                       for(int i=0; i < 
expressionKeys.keyFields.size(); i++) {
+                                               
if(!expressionKeys.keyFields.get(i).getType().equals(keyTypeFields.get(i).getType()))
 {
+                                                       throw new 
IncompatibleKeysException(expressionKeys.keyFields.get(i).getType(), 
keyTypeFields.get(i).getType() );
+                                               }
+                                       }
+                                       return true;
                                }
-
-                               return fpk.types[0].equals(this.keyType);
-                       }
-                       else {
-                               return false;
+                               if(expressionKeys.getNumberOfKeyFields() != 1) {
+                                       throw new 
IncompatibleKeysException("Key selector functions are only compatible to one 
key");
+                               }
+                               
+                               
if(expressionKeys.keyFields.get(0).getType().equals(this.keyType)) {
+                                       return true;
+                               } else {
+                                       throw new 
IncompatibleKeysException(expressionKeys.keyFields.get(0).getType(), 
this.keyType);
+                               }
+                       } else {
+                               throw new IncompatibleKeysException("The key is 
not compatible with "+other);
                        }
                }
 
                @Override
                public int[] computeLogicalKeyPositions() {
-                       return new int[] {0};
+                       return logicalKeyFields;
                }
 
                @Override
@@ -188,93 +152,178 @@ public abstract class Keys<T> {
                        return "Key function (Type: " + keyType + ")";
                }
        }
-
-       // 
--------------------------------------------------------------------------------------------
-
+       
+       
+       /**
+        * Represents (nested) field access through string and integer-based 
keys for Composite Types (Tuple or Pojo)
+        */
        public static class ExpressionKeys<T> extends Keys<T> {
+               
+               public static final String SELECT_ALL_CHAR = "*";
+               
+               /**
+                * Flattened fields representing keys fields
+                */
+               private List<FlatFieldDescriptor> keyFields;
+               
+               /**
+                * two constructors for field-based (tuple-type) keys
+                */
+               public ExpressionKeys(int[] groupingFields, TypeInformation<T> 
type) {
+                       this(groupingFields, type, false);
+               }
 
-               private int[] logicalPositions;
-
-               private final TypeInformation<?>[] types;
-
-               @SuppressWarnings("unused")
-               private PojoTypeInfo<?> type;
+               // int-defined field
+               public ExpressionKeys(int[] groupingFields, TypeInformation<T> 
type, boolean allowEmpty) {
+                       if (!type.isTupleType()) {
+                               throw new InvalidProgramException("Specifying 
keys via field positions is only valid" +
+                                               "for tuple data types. Type: " 
+ type);
+                       }
 
-               public ExpressionKeys(String[] expressions, TypeInformation<T> 
type) {
-                       if (!(type instanceof PojoTypeInfo<?>)) {
-                               throw new UnsupportedOperationException("Key 
expressions can only be used on POJOs." + " " +
-                                               "A POJO must have a default 
constructor without arguments and not have readObject" +
-                                               " and/or writeObject methods. A 
current restriction is that it can only have nested POJOs or primitive (also 
boxed)" +
-                                               " fields.");
+                       if (!allowEmpty && (groupingFields == null || 
groupingFields.length == 0)) {
+                               throw new IllegalArgumentException("The 
grouping fields must not be empty.");
                        }
-                       PojoTypeInfo<?> pojoType = (PojoTypeInfo<?>) type;
-                       this.type = pojoType;
-                       logicalPositions = 
pojoType.getLogicalPositions(expressions);
-                       types = pojoType.getTypes(expressions);
-
-                       for (int i = 0; i < logicalPositions.length; i++) {
-                               if (logicalPositions[i] < 0) {
-                                       throw new 
IllegalArgumentException("Expression '" + expressions[i] + "' is not a valid 
key for POJO" +
-                                                       " type " + 
type.toString() + ".");
+                       // select all fields. Therefore, set all fields on this 
tuple level and let the logic handle the rest
+                       // (makes type assignment easier).
+                       if (groupingFields == null || groupingFields.length == 
0) {
+                               groupingFields = new int[type.getArity()];
+                               for (int i = 0; i < groupingFields.length; i++) 
{
+                                       groupingFields[i] = i;
                                }
+                       } else {
+                               groupingFields = 
rangeCheckFields(groupingFields, type.getArity() -1);
                        }
+                       TupleTypeInfoBase<?> tupleType = 
(TupleTypeInfoBase<?>)type;
+                       Preconditions.checkArgument(groupingFields.length > 0, 
"Grouping fields can not be empty at this point");
+                       
+                       keyFields = new 
ArrayList<FlatFieldDescriptor>(type.getTotalFields());
+                       // for each key, find the field:
+                       for(int j = 0; j < groupingFields.length; j++) {
+                               for(int i = 0; i < type.getArity(); i++) {
+                                       TypeInformation<?> fieldType = 
tupleType.getTypeAt(i);
+                                       
+                                       if(groupingFields[j] == i) { // check 
if user set the key
+                                               int keyId = 
countNestedElementsBefore(tupleType, i) + i;
+                                               if(fieldType instanceof 
TupleTypeInfoBase) {
+                                                       TupleTypeInfoBase<?> 
tupleFieldType = (TupleTypeInfoBase<?>) fieldType;
+                                                       
tupleFieldType.addAllFields(keyId, keyFields);
+                                               } else {
+                                                       
Preconditions.checkArgument(fieldType instanceof AtomicType, "Wrong field 
type");
+                                                       keyFields.add(new 
FlatFieldDescriptor(keyId, fieldType));
+                                               }
+                                               
+                                       }
+                               }
+                       }
+                       keyFields = removeNullElementsFromList(keyFields);
                }
-
+               
+               private static int 
countNestedElementsBefore(TupleTypeInfoBase<?> tupleType, int pos) {
+                       if( pos == 0) {
+                               return 0;
+                       }
+                       int ret = 0;
+                       for (int i = 0; i < pos; i++) {
+                               TypeInformation<?> fieldType = 
tupleType.getTypeAt(i);
+                               ret += fieldType.getTotalFields() -1;
+                       }
+                       return ret;
+               }
+               
+               public static <R> List<R> removeNullElementsFromList(List<R> 
in) {
+                       List<R> elements = new ArrayList<R>();
+                       for(R e: in) {
+                               if(e != null) {
+                                       elements.add(e);
+                               }
+                       }
+                       return elements;
+               }
+               
+               /**
+                * Create ExpressionKeys from String-expressions
+                */
+               public ExpressionKeys(String[] expressionsIn, 
TypeInformation<T> type) {
+                       if(!(type instanceof CompositeType<?>)) {
+                               throw new IllegalArgumentException("Type 
"+type+" is not a composite type. Key expressions are not supported.");
+                       }
+                       CompositeType<T> cType = (CompositeType<T>) type;
+                       
+                       String[] expressions = removeDuplicates(expressionsIn);
+                       if(expressionsIn.length != expressions.length) {
+                               LOG.warn("The key expressions contained 
duplicates. They are now unique");
+                       }
+                       // extract the keys on their flat position
+                       keyFields = new 
ArrayList<FlatFieldDescriptor>(expressions.length);
+                       for (int i = 0; i < expressions.length; i++) {
+                               List<FlatFieldDescriptor> keys = new 
ArrayList<FlatFieldDescriptor>(); // use separate list to do a size check
+                               cType.getKey(expressions[i], 0, keys);
+                               if(keys.size() == 0) {
+                                       throw new 
IllegalArgumentException("Unable to extract key from expression 
"+expressions[i]+" on key "+cType);
+                               }
+                               keyFields.addAll(keys);
+                       }
+               }
+               
                @Override
                public int getNumberOfKeyFields() {
-                       return logicalPositions.length;
+                       if(keyFields == null) {
+                               return 0;
+                       }
+                       return keyFields.size();
                }
 
                @Override
-               public boolean areCompatibale(Keys<?> other) {
+               public boolean areCompatible(Keys<?> other) throws 
IncompatibleKeysException {
 
                        if (other instanceof ExpressionKeys) {
                                ExpressionKeys<?> oKey = (ExpressionKeys<?>) 
other;
 
-                               if(oKey.types.length != this.types.length) {
-                                       return false;
+                               if(oKey.getNumberOfKeyFields() != 
this.getNumberOfKeyFields() ) {
+                                       throw new 
IncompatibleKeysException(IncompatibleKeysException.SIZE_MISMATCH_MESSAGE);
                                }
-                               for(int i=0; i<this.types.length; i++) {
-                                       
if(!this.types[i].equals(oKey.types[i])) {
-                                               return false;
+                               for(int i=0; i < this.keyFields.size(); i++) {
+                                       
if(!this.keyFields.get(i).getType().equals(oKey.keyFields.get(i).getType())) {
+                                               throw new 
IncompatibleKeysException(this.keyFields.get(i).getType(), 
oKey.keyFields.get(i).getType() );
                                        }
                                }
                                return true;
-
+                       } else if(other instanceof SelectorFunctionKeys<?, ?>) {
+                               return other.areCompatible(this);
                        } else {
-                               return false;
+                               throw new IncompatibleKeysException("The key is 
not compatible with "+other);
                        }
                }
 
                @Override
                public int[] computeLogicalKeyPositions() {
-                       return logicalPositions;
+                       List<Integer> logicalKeys = new LinkedList<Integer>();
+                       for(FlatFieldDescriptor kd : keyFields) {
+                               logicalKeys.addAll( 
Ints.asList(kd.getPosition()));
+                       }
+                       return Ints.toArray(logicalKeys);
                }
+               
+       }
+       
+       private static String[] removeDuplicates(String[] in) {
+               List<String> ret = new LinkedList<String>();
+               for(String el : in) {
+                       if(!ret.contains(el)) {
+                               ret.add(el);
+                       }
+               }
+               return ret.toArray(new String[ret.size()]);
        }
-
-
+       // 
--------------------------------------------------------------------------------------------
+       
+       
        // 
--------------------------------------------------------------------------------------------
        //  Utilities
        // 
--------------------------------------------------------------------------------------------
 
-       private static int[] makeFields(int[] fields, TupleTypeInfoBase<?> 
type) {
-               int inLength = type.getArity();
-
-               // null parameter means all fields are considered
-               if (fields == null || fields.length == 0) {
-                       fields = new int[inLength];
-                       for (int i = 0; i < inLength; i++) {
-                               fields[i] = i;
-                       }
-                       return fields;
-               } else {
-                       return rangeCheckAndOrderFields(fields, inLength-1);
-               }
-       }
 
-       private static final int[] rangeCheckAndOrderFields(int[] fields, int 
maxAllowedField) {
-               // order
-               Arrays.sort(fields);
+       private static final int[] rangeCheckFields(int[] fields, int 
maxAllowedField) {
 
                // range check and duplicate eliminate
                int i = 1, k = 0;
@@ -285,12 +334,12 @@ public abstract class Keys<T> {
                }
 
                for (; i < fields.length; i++) {
-                       if (fields[i] < 0 || i > maxAllowedField) {
+                       if (fields[i] < 0 || fields[i] > maxAllowedField) {
                                throw new IllegalArgumentException("Tuple 
position is out of range.");
                        }
-
                        if (fields[i] != last) {
                                k++;
+                               last = fields[i];
                                fields[k] = fields[i];
                        }
                }
@@ -299,7 +348,20 @@ public abstract class Keys<T> {
                if (k == fields.length - 1) {
                        return fields;
                } else {
-                       return Arrays.copyOfRange(fields, 0, k);
+                       return Arrays.copyOfRange(fields, 0, k+1);
+               }
+       }
+       
+       public static class IncompatibleKeysException extends Exception {
+               private static final long serialVersionUID = 1L;
+               public static final String SIZE_MISMATCH_MESSAGE = "The number 
of specified keys is different.";
+               
+               public IncompatibleKeysException(String message) {
+                       super(message);
+               }
+
+               public IncompatibleKeysException(TypeInformation<?> 
typeInformation, TypeInformation<?> typeInformation2) {
+                       super(typeInformation+" and "+typeInformation2+" are 
not compatible");
                }
        }
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/926f835a/flink-java/src/main/java/org/apache/flink/api/java/operators/PartitionOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/PartitionOperator.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/PartitionOperator.java
index f0931b5..532e464 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/PartitionOperator.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/PartitionOperator.java
@@ -50,7 +50,7 @@ public class PartitionOperator<T> extends 
SingleInputUdfOperator<T, T, Partition
                        throw new UnsupportedOperationException("Range 
Partitioning not yet supported");
                }
                
-               if(pKeys instanceof Keys.FieldPositionKeys<?> && 
!input.getType().isTupleType()) {
+               if(pKeys instanceof Keys.ExpressionKeys<?> && 
!input.getType().isTupleType()) {
                        throw new IllegalArgumentException("Hash Partitioning 
with key fields only possible on Tuple DataSets");
                }
                
@@ -83,7 +83,7 @@ public class PartitionOperator<T> extends 
SingleInputUdfOperator<T, T, Partition
                } 
                else if (pMethod == PartitionMethod.HASH) {
                        
-                       if (pKeys instanceof Keys.FieldPositionKeys) {
+                       if (pKeys instanceof Keys.ExpressionKeys) {
                                
                                int[] logicalKeyPositions = 
pKeys.computeLogicalKeyPositions();
                                UnaryOperatorInformation<T, T> operatorInfo = 
new UnaryOperatorInformation<T, T>(getType(), getType());

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/926f835a/flink-java/src/main/java/org/apache/flink/api/java/operators/ReduceOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/ReduceOperator.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/ReduceOperator.java
index 463b31c..8cb64ba 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/ReduceOperator.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/ReduceOperator.java
@@ -101,8 +101,7 @@ public class ReduceOperator<IN> extends 
SingleInputUdfOperator<IN, IN, ReduceOpe
                        MapOperatorBase<?, IN, ?> po = 
translateSelectorFunctionReducer(selectorKeys, function, getInputType(), name, 
input, this.getParallelism());
                        return po;
                }
-               else if (grouper.getKeys() instanceof Keys.FieldPositionKeys ||
-                               grouper.getKeys() instanceof 
Keys.ExpressionKeys) {
+               else if (grouper.getKeys() instanceof Keys.ExpressionKeys) {
                        
                        // reduce with field positions
                        int[] logicalKeyPositions = 
grouper.getKeys().computeLogicalKeyPositions();

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/926f835a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/GenericTypeInfo.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/GenericTypeInfo.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/GenericTypeInfo.java
index e862b5a..6272538 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/GenericTypeInfo.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/GenericTypeInfo.java
@@ -52,6 +52,11 @@ public class GenericTypeInfo<T> extends TypeInformation<T> 
implements AtomicType
        public int getArity() {
                return 1;
        }
+       
+       @Override
+       public int getTotalFields() {
+               return 1;
+       }
 
        @Override
        public Class<T> getTypeClass() {
@@ -65,6 +70,9 @@ public class GenericTypeInfo<T> extends TypeInformation<T> 
implements AtomicType
 
        @Override
        public TypeSerializer<T> createSerializer() {
+               // NOTE: The TypeExtractor / pojo logic is assuming that we are 
using a Avro Serializer here
+               // in particular classes implementing GenericContainer are 
handled as GenericTypeInfos 
+               // (this will probably not work with Kryo)
                return new AvroSerializer<T>(this.typeClass);
        }
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/926f835a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/ObjectArrayTypeInfo.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/ObjectArrayTypeInfo.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/ObjectArrayTypeInfo.java
index 226557f..55128e6 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/ObjectArrayTypeInfo.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/ObjectArrayTypeInfo.java
@@ -64,6 +64,11 @@ public class ObjectArrayTypeInfo<T, C> extends 
TypeInformation<T> {
        public int getArity() {
                return 1;
        }
+       
+       @Override
+       public int getTotalFields() {
+               return 1;
+       }
 
        @SuppressWarnings("unchecked")
        @Override

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/926f835a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/PojoField.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/PojoField.java 
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/PojoField.java
index 1b8ef35..bf0e25a 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/PojoField.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/PojoField.java
@@ -22,7 +22,7 @@ import java.lang.reflect.Field;
 
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 
-class PojoField {
+public class PojoField {
        public Field field;
        public TypeInformation<?> type;
 
@@ -30,4 +30,9 @@ class PojoField {
                this.field = field;
                this.type = type;
        }
-}              
+
+       @Override
+       public String toString() {
+               return "PojoField " + field.getDeclaringClass() + "." + 
field.getName() + " (" + type + ")";
+       }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/926f835a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java
index 51ed507..fba1f24 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java
@@ -18,33 +18,41 @@
 
 package org.apache.flink.api.java.typeutils;
 
-import com.google.common.base.Joiner;
-
 import java.lang.reflect.Field;
+import java.lang.reflect.Modifier;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.List;
 
+import org.apache.commons.lang3.Validate;
 import org.apache.flink.api.common.typeinfo.AtomicType;
-import org.apache.flink.api.common.typeinfo.CompositeType;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.CompositeType;
 import org.apache.flink.api.common.typeutils.TypeComparator;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.operators.Keys.ExpressionKeys;
 import org.apache.flink.api.java.typeutils.runtime.PojoComparator;
 import org.apache.flink.api.java.typeutils.runtime.PojoSerializer;
 
+import com.google.common.base.Joiner;
+
 
 /**
- *
+ * TypeInformation for arbitrary (they have to be java-beans-style) java 
objects (what we call POJO).
+ * 
  */
-public class PojoTypeInfo<T> extends TypeInformation<T> implements 
CompositeType<T> {
+public class PojoTypeInfo<T> extends CompositeType<T>{
 
        private final Class<T> typeClass;
 
        private PojoField[] fields;
+       
+       private int totalFields;
 
        public PojoTypeInfo(Class<T> typeClass, List<PojoField> fields) {
+               super(typeClass);
                this.typeClass = typeClass;
                List<PojoField> tempFields = new ArrayList<PojoField>(fields);
                Collections.sort(tempFields, new Comparator<PojoField>() {
@@ -54,6 +62,14 @@ public class PojoTypeInfo<T> extends TypeInformation<T> 
implements CompositeType
                        }
                });
                this.fields = tempFields.toArray(new 
PojoField[tempFields.size()]);
+               
+               // check if POJO is public
+               if(!Modifier.isPublic(typeClass.getModifiers())) {
+                       throw new RuntimeException("POJO "+typeClass+" is not 
public");
+               }
+               for(PojoField field : fields) {
+                       totalFields += field.type.getTotalFields();
+               }
        }
 
        @Override
@@ -71,6 +87,11 @@ public class PojoTypeInfo<T> extends TypeInformation<T> 
implements CompositeType
        public int getArity() {
                return fields.length;
        }
+       
+       @Override
+       public int getTotalFields() {
+               return totalFields;
+       }
 
        @Override
        public Class<T> getTypeClass() {
@@ -82,18 +103,6 @@ public class PojoTypeInfo<T> extends TypeInformation<T> 
implements CompositeType
                return Comparable.class.isAssignableFrom(typeClass);
        }
 
-       @Override
-       public TypeSerializer<T> createSerializer() {
-               TypeSerializer<?>[] fieldSerializers = new 
TypeSerializer<?>[fields.length];
-               Field[] reflectiveFields = new Field[fields.length];
-
-               for (int i = 0; i < fields.length; i++) {
-                       fieldSerializers[i] = fields[i].type.createSerializer();
-                       reflectiveFields[i] = fields[i].field;
-               }
-
-               return new PojoSerializer<T>(this.typeClass, fieldSerializers, 
reflectiveFields);
-       }
 
        @Override
        public String toString() {
@@ -105,73 +114,124 @@ public class PojoTypeInfo<T> extends TypeInformation<T> 
implements CompositeType
                                + ", fields = [" + Joiner.on(", 
").join(fieldStrings) + "]"
                                + ">";
        }
-
-       public int getLogicalPosition(String fieldExpression) {
-               for (int i = 0; i < fields.length; i++) {
-                       if (fields[i].field.getName().equals(fieldExpression)) {
-                               return i;
+       
+       @Override
+       public void getKey(String fieldExpression, int offset, 
List<FlatFieldDescriptor> result) {
+               // handle 'select all' first
+               if(fieldExpression.equals(ExpressionKeys.SELECT_ALL_CHAR)) {
+                       int keyPosition = 0;
+                       for(PojoField field : fields) {
+                               if(field.type instanceof AtomicType) {
+                                       result.add(new 
FlatFieldDescriptor(offset + keyPosition, field.type));
+                               } else if(field.type instanceof CompositeType) {
+                                       CompositeType<?> cType = 
(CompositeType<?>)field.type;
+                                       
cType.getKey(String.valueOf(ExpressionKeys.SELECT_ALL_CHAR), offset + 
keyPosition, result);
+                                       keyPosition += cType.getTotalFields()-1;
+                               } else {
+                                       throw new RuntimeException("Unexpected 
key type: "+field.type);
+                               }
+                               keyPosition++;
                        }
+                       return;
+               }
+               Validate.notEmpty(fieldExpression, "Field expression must not 
be empty.");
+               // if there is a dot try getting the field from that sub field
+               int firstDot = fieldExpression.indexOf('.');
+               if (firstDot == -1) {
+                       // this is the last field (or only field) in the field 
expression
+                       int fieldId = 0;
+                       for (int i = 0; i < fields.length; i++) {
+                               if(fields[i].type instanceof CompositeType) {
+                                       fieldId += 
fields[i].type.getTotalFields()-1;
+                               }
+                               if 
(fields[i].field.getName().equals(fieldExpression)) {
+                                       result.add(new 
FlatFieldDescriptor(offset + fieldId, fields[i].type));
+                                       return;
+                               }
+                               fieldId++;
+                       }
+               } else {
+                       // split and go deeper
+                       String firstField = fieldExpression.substring(0, 
firstDot);
+                       String rest = fieldExpression.substring(firstDot + 1);
+                       int fieldId = 0;
+                       for (int i = 0; i < fields.length; i++) {
+                               if 
(fields[i].field.getName().equals(firstField)) {
+                                       if (!(fields[i].type instanceof 
CompositeType<?>)) {
+                                               throw new 
RuntimeException("Field "+fields[i].type+" is not composite type");
+                                       }
+                                       CompositeType<?> cType = 
(CompositeType<?>) fields[i].type;
+                                       cType.getKey(rest, offset + fieldId, 
result); // recurse
+                                       return;
+                               }
+                               fieldId++;
+                       }
+                       throw new RuntimeException("Unable to find field 
"+fieldExpression+" in type "+this+" (looking for '"+firstField+"')");
                }
-               return -1;
        }
 
-       public int[] getLogicalPositions(String[] fieldExpression) {
-               int[] result = new int[fieldExpression.length];
-               for (int i = 0; i < fieldExpression.length; i++) {
-                       result[i] = getLogicalPosition(fieldExpression[i]);
+       @Override
+       public <X> TypeInformation<X> getTypeAt(int pos) {
+               if (pos < 0 || pos >= this.fields.length) {
+                       throw new IndexOutOfBoundsException();
                }
-               return result;
+               @SuppressWarnings("unchecked")
+               TypeInformation<X> typed = (TypeInformation<X>) 
fields[pos].type;
+               return typed;
        }
 
-       public TypeInformation<?> getType(String fieldExpression) {
-               for (int i = 0; i < fields.length; i++) {
-                       if (fields[i].field.getName().equals(fieldExpression)) {
-                               return fields[i].type;
-                       }
+       // used for testing. Maybe use mockito here
+       public PojoField getPojoFieldAt(int pos) {
+               if (pos < 0 || pos >= this.fields.length) {
+                       throw new IndexOutOfBoundsException();
                }
-               return null;
+               return this.fields[pos];
        }
 
-       public TypeInformation<?>[] getTypes(String[] fieldExpression) {
-               TypeInformation<?>[] result = new 
TypeInformation<?>[fieldExpression.length];
-               for (int i = 0; i < fieldExpression.length; i++) {
-                       result[i] = getType(fieldExpression[i]);
-               }
-               return result;
+       /**
+        * Comparator creation
+        */
+       private TypeComparator<?>[] fieldComparators;
+       private Field[] keyFields;
+       private int comparatorHelperIndex = 0;
+       @Override
+       protected void initializeNewComparator(int keyCount) {
+               fieldComparators = new TypeComparator<?>[keyCount];
+               keyFields = new Field[keyCount];
+               comparatorHelperIndex = 0;
        }
 
        @Override
-       public TypeComparator<T> createComparator(int[] logicalKeyFields, 
boolean[] orders) {
-               // sanity checks
-               if (logicalKeyFields == null || orders == null || 
logicalKeyFields.length != orders.length ||
-                               logicalKeyFields.length > fields.length)
-               {
-                       throw new IllegalArgumentException();
-               }
+       protected void addCompareField(int fieldId, TypeComparator<?> 
comparator) {
+               fieldComparators[comparatorHelperIndex] = comparator;
+               keyFields[comparatorHelperIndex] = fields[fieldId].field;
+               comparatorHelperIndex++;
+       }
 
-//             if (logicalKeyFields.length == 1) {
-//                     return createSinglefieldComparator(logicalKeyFields[0], 
orders[0], types[logicalKeyFields[0]]);
-//             }
+       @Override
+       protected TypeComparator<T> getNewComparator() {
+               // first remove the null array fields
+               final Field[] finalKeyFields = Arrays.copyOf(keyFields, 
comparatorHelperIndex);
+               @SuppressWarnings("rawtypes")
+               final TypeComparator[] finalFieldComparators = 
Arrays.copyOf(fieldComparators, comparatorHelperIndex);
+               if(finalFieldComparators.length == 0 || finalKeyFields.length 
== 0 ||  finalFieldComparators.length != finalKeyFields.length) {
+                       throw new IllegalArgumentException("Pojo comparator 
creation has a bug");
+               }
+               return new PojoComparator<T>(finalKeyFields, 
finalFieldComparators, createSerializer(), typeClass);
+       }
 
-               // create the comparators for the individual fields
-               TypeComparator<?>[] fieldComparators = new 
TypeComparator<?>[logicalKeyFields.length];
-               Field[] keyFields = new Field[logicalKeyFields.length];
 
-               for (int i = 0; i < logicalKeyFields.length; i++) {
-                       int field = logicalKeyFields[i];
+       @Override
+       public TypeSerializer<T> createSerializer() {
+               TypeSerializer<?>[] fieldSerializers = new 
TypeSerializer<?>[fields.length ];
+               Field[] reflectiveFields = new Field[fields.length];
 
-                       if (field < 0 || field >= fields.length) {
-                               throw new IllegalArgumentException("The field 
position " + field + " is out of range [0," + fields.length + ")");
-                       }
-                       if (fields[field].type.isKeyType() && 
fields[field].type instanceof AtomicType) {
-                               fieldComparators[i] = ((AtomicType<?>) 
fields[field].type).createComparator(orders[i]);
-                               keyFields[i] = fields[field].field;
-                               keyFields[i].setAccessible(true);
-                       } else {
-                               throw new IllegalArgumentException("The field 
at position " + field + " (" + fields[field].type + ") is no atomic key type.");
-                       }
+               for (int i = 0; i < fields.length; i++) {
+                       fieldSerializers[i] = fields[i].type.createSerializer();
+                       reflectiveFields[i] = fields[i].field;
                }
 
-               return new PojoComparator<T>(keyFields, fieldComparators, 
createSerializer(), typeClass);
+               return new PojoSerializer<T>(this.typeClass, fieldSerializers, 
reflectiveFields);
        }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/926f835a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/RecordTypeInfo.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/RecordTypeInfo.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/RecordTypeInfo.java
index f069eed..2464f25 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/RecordTypeInfo.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/RecordTypeInfo.java
@@ -43,6 +43,11 @@ public class RecordTypeInfo extends TypeInformation<Record> {
        public int getArity() {
                return 1;
        }
+       
+       @Override
+       public int getTotalFields() {
+               return 1;
+       }
 
        @Override
        public Class<Record> getTypeClass() {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/926f835a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfo.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfo.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfo.java
index 6edb08c..82f9c50 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfo.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfo.java
@@ -20,22 +20,21 @@ package org.apache.flink.api.java.typeutils;
 
 import java.util.Arrays;
 
-import org.apache.flink.api.common.typeinfo.AtomicType;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.TypeComparator;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.typeutils.runtime.TupleComparator;
-import org.apache.flink.api.java.typeutils.runtime.TupleSerializer;
-
 //CHECKSTYLE.OFF: AvoidStarImport - Needed for TupleGenerator
 import org.apache.flink.api.java.tuple.*;
 //CHECKSTYLE.ON: AvoidStarImport
+import org.apache.flink.api.java.typeutils.runtime.TupleComparator;
+import org.apache.flink.api.java.typeutils.runtime.TupleSerializer;
 
 
 
-public final class TupleTypeInfo<T extends Tuple> extends TupleTypeInfoBase<T> 
{
 
+public final class TupleTypeInfo<T extends Tuple> extends TupleTypeInfoBase<T> 
{
+       
        @SuppressWarnings("unchecked")
        public TupleTypeInfo(TypeInformation<?>... types) {
                this((Class<T>) CLASSES[types.length - 1], types);
@@ -60,59 +59,72 @@ public final class TupleTypeInfo<T extends Tuple> extends 
TupleTypeInfoBase<T> {
                return new TupleSerializer<T>(tupleClass, fieldSerializers);
        }
        
+       /**
+        * Comparator creation
+        */
+       private TypeSerializer<?>[] fieldSerializers;
+       private TypeComparator<?>[] fieldComparators;
+       private int[] logicalKeyFields;
+       private int comparatorHelperIndex = 0;
+       
        @Override
-       public TypeComparator<T> createComparator(int[] logicalKeyFields, 
boolean[] orders) {
-               // sanity checks
-               if (logicalKeyFields == null || orders == null || 
logicalKeyFields.length != orders.length ||
-                               logicalKeyFields.length > types.length)
-               {
-                       throw new IllegalArgumentException();
-               }
+       protected void initializeNewComparator(int localKeyCount) {
+               fieldSerializers = new TypeSerializer[localKeyCount];
+               fieldComparators = new TypeComparator<?>[localKeyCount];
+               logicalKeyFields = new int[localKeyCount];
+               comparatorHelperIndex = 0;
+       }
 
-               int maxKey = -1;
-               for (int key : logicalKeyFields){
-                       maxKey = Math.max(key, maxKey);
-               }
-               
-               if (maxKey >= this.types.length) {
-                       throw new IllegalArgumentException("The key position " 
+ maxKey + " is out of range for Tuple" + types.length);
-               }
-               
-               // create the comparators for the individual fields
-               TypeComparator<?>[] fieldComparators = new 
TypeComparator<?>[logicalKeyFields.length];
-               for (int i = 0; i < logicalKeyFields.length; i++) {
-                       int keyPos = logicalKeyFields[i];
-                       if (types[keyPos].isKeyType() && types[keyPos] 
instanceof AtomicType) {
-                               fieldComparators[i] = ((AtomicType<?>) 
types[keyPos]).createComparator(orders[i]);
-                       } else if(types[keyPos].isTupleType() && types[keyPos] 
instanceof TupleTypeInfo){ // Check for tuple
-                               TupleTypeInfo<?> tupleType = (TupleTypeInfo<?>) 
types[keyPos];
-                               
-                               // All fields are key
-                               int[] allFieldsKey = new 
int[tupleType.types.length];
-                               for(int h = 0; h < tupleType.types.length; h++){
-                                       allFieldsKey[h]=h;
-                               }
-                               
-                               // Prepare order
-                               boolean[] tupleOrders = new 
boolean[tupleType.types.length];
-                               Arrays.fill(tupleOrders, orders[i]);
-                               fieldComparators[i] = 
tupleType.createComparator(allFieldsKey, tupleOrders);
-                       } else {
-                               throw new IllegalArgumentException("The field 
at position " + i + " (" + types[keyPos] + ") is no atomic key type nor tuple 
type.");
-                       }
-               }
-               
+       @Override
+       protected void addCompareField(int fieldId, TypeComparator<?> 
comparator) {
+               fieldComparators[comparatorHelperIndex] = comparator;
+               fieldSerializers[comparatorHelperIndex] = 
types[fieldId].createSerializer();
+               logicalKeyFields[comparatorHelperIndex] = fieldId;
+               comparatorHelperIndex++;
+       }
+
+       @Override
+       protected TypeComparator<T> getNewComparator() {
+               @SuppressWarnings("rawtypes")
+               final TypeComparator[] finalFieldComparators = 
Arrays.copyOf(fieldComparators, comparatorHelperIndex);
+               final int[] finalLogicalKeyFields = 
Arrays.copyOf(logicalKeyFields, comparatorHelperIndex);
+               //final TypeSerializer[] finalFieldSerializers = 
Arrays.copyOf(fieldSerializers, comparatorHelperIndex);
                // create the serializers for the prefix up to highest key 
position
+               int maxKey = 0;
+               for(int key : finalLogicalKeyFields) {
+                       maxKey = Math.max(maxKey, key);
+               }
                TypeSerializer<?>[] fieldSerializers = new 
TypeSerializer<?>[maxKey + 1];
                for (int i = 0; i <= maxKey; i++) {
                        fieldSerializers[i] = types[i].createSerializer();
                }
-               
-               return new TupleComparator<T>(logicalKeyFields, 
fieldComparators, fieldSerializers);
+               if(finalFieldComparators.length == 0 || 
finalLogicalKeyFields.length == 0 || fieldSerializers.length == 0 
+                               || finalFieldComparators.length != 
finalLogicalKeyFields.length) {
+                       throw new IllegalArgumentException("Tuple comparator 
creation has a bug");
+               }
+               return new TupleComparator<T>(finalLogicalKeyFields, 
finalFieldComparators, fieldSerializers);
        }
        
        // 
--------------------------------------------------------------------------------------------
-
+       
+       @Override
+       public boolean equals(Object obj) {
+               if (obj instanceof TupleTypeInfo) {
+                       @SuppressWarnings("unchecked")
+                       TupleTypeInfo<T> other = (TupleTypeInfo<T>) obj;
+                       return ((this.tupleType == null && other.tupleType == 
null) || this.tupleType.equals(other.tupleType)) &&
+                                       Arrays.deepEquals(this.types, 
other.types);
+                       
+               } else {
+                       return false;
+               }
+       }
+       
+       @Override
+       public int hashCode() {
+               return this.types.hashCode() ^ Arrays.deepHashCode(this.types);
+       }
+       
        @Override
        public String toString() {
                return "Java " + super.toString();

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/926f835a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfoBase.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfoBase.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfoBase.java
index 3e1b646..4babbd7 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfoBase.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfoBase.java
@@ -19,19 +19,31 @@
 package org.apache.flink.api.java.typeutils;
 
 import java.util.Arrays;
+import java.util.List;
 
-import org.apache.flink.api.common.typeinfo.CompositeType;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.flink.api.common.typeinfo.AtomicType;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.CompositeType;
+import org.apache.flink.api.java.operators.Keys.ExpressionKeys;
 
-public abstract class TupleTypeInfoBase<T> extends TypeInformation<T> 
implements CompositeType<T> {
+import com.google.common.base.Preconditions;
+
+public abstract class TupleTypeInfoBase<T> extends CompositeType<T> {
        
        protected final TypeInformation<?>[] types;
        
        protected final Class<T> tupleType;
+
+       private int totalFields;
        
        public TupleTypeInfoBase(Class<T> tupleType, TypeInformation<?>... 
types) {
+               super(tupleType);
                this.tupleType = tupleType;
                this.types = types;
+               for(TypeInformation<?> type : types) {
+                       totalFields += type.getTotalFields();
+               }
        }
 
        @Override
@@ -48,6 +60,11 @@ public abstract class TupleTypeInfoBase<T> extends 
TypeInformation<T> implements
        public int getArity() {
                return types.length;
        }
+       
+       @Override
+       public int getTotalFields() {
+               return totalFields;
+       }
 
        @Override
        public Class<T> getTypeClass() {
@@ -55,6 +72,95 @@ public abstract class TupleTypeInfoBase<T> extends 
TypeInformation<T> implements
        }
 
        
+       /**
+        * Recursively add all fields in this tuple type. We need this in 
particular to get all
+        * the types.
+        * @param keyId
+        * @param keyFields
+        */
+       public void addAllFields(int startKeyId, List<FlatFieldDescriptor> 
keyFields) {
+               for(int i = 0; i < this.getArity(); i++) {
+                       TypeInformation<?> type = this.types[i];
+                       if(type instanceof AtomicType) {
+                               keyFields.add(new 
FlatFieldDescriptor(startKeyId, type));
+                       } else if(type instanceof TupleTypeInfoBase<?>) {
+                               TupleTypeInfoBase<?> ttb = 
(TupleTypeInfoBase<?>) type;
+                               ttb.addAllFields(startKeyId, keyFields);
+                       }
+                       startKeyId += type.getTotalFields();
+               }
+       }
+       
+
+       @Override
+       public void getKey(String fieldExpression, int offset, 
List<FlatFieldDescriptor> result) {
+               // handle 'select all'
+               if(fieldExpression.equals(ExpressionKeys.SELECT_ALL_CHAR)) {
+                       int keyPosition = 0;
+                       for(TypeInformation<?> type : types) {
+                               if(type instanceof AtomicType) {
+                                       result.add(new 
FlatFieldDescriptor(offset + keyPosition, type));
+                               } else if(type instanceof CompositeType) {
+                                       CompositeType<?> cType = 
(CompositeType<?>)type;
+                                       
cType.getKey(String.valueOf(ExpressionKeys.SELECT_ALL_CHAR), offset + 
keyPosition, result);
+                                       keyPosition += cType.getTotalFields()-1;
+                               } else {
+                                       throw new RuntimeException("Unexpected 
key type: "+type);
+                               }
+                               keyPosition++;
+                       }
+                       return;
+               }
+               // check input
+               if(fieldExpression.length() < 2) {
+                       throw new IllegalArgumentException("The field 
expression '"+fieldExpression+"' is incorrect. The length must be at least 2");
+               }
+               if(fieldExpression.charAt(0) != 'f') {
+                       throw new IllegalArgumentException("The field 
expression '"+fieldExpression+"' is incorrect for a Tuple type. It has to start 
with an 'f'");
+               }
+               // get first component of nested expression
+               int dotPos = fieldExpression.indexOf('.');
+               String nestedSplitFirst = fieldExpression;
+               if(dotPos != -1 ) {
+                       Preconditions.checkArgument(dotPos != 
fieldExpression.length()-1, "The field expression can never end with a dot.");
+                       nestedSplitFirst = fieldExpression.substring(0, dotPos);
+               }
+               String fieldNumStr = nestedSplitFirst.substring(1, 
nestedSplitFirst.length());
+               if(!StringUtils.isNumeric(fieldNumStr)) {
+                       throw new IllegalArgumentException("The field 
expression '"+fieldExpression+"' is incorrect. Field number '"+fieldNumStr+" is 
not numeric");
+               }
+               int pos = -1;
+               try {
+                       pos = Integer.valueOf(fieldNumStr);
+               } catch(NumberFormatException nfe) {
+                       throw new IllegalArgumentException("The field 
expression '"+fieldExpression+"' is incorrect. Field number '"+fieldNumStr+" is 
not numeric", nfe);
+               }
+               if(pos < 0) {
+                       throw new IllegalArgumentException("Negative position 
is not possible");
+               }
+               // pass down the remainder (after the dot) of the 
fieldExpression to the type at that position.
+               if(dotPos != -1) { // we need to go deeper
+                       String rem = fieldExpression.substring(dotPos+1);
+                       if( !(types[pos] instanceof CompositeType<?>) ) {
+                               throw new RuntimeException("Element at position 
"+pos+" is not a composite type. There are no nested types to select");
+                       }
+                       CompositeType<?> cType = (CompositeType<?>) types[pos];
+                       cType.getKey(rem, offset + pos, result);
+                       return;
+               }
+               
+               if(pos >= types.length) {
+                       throw new IllegalArgumentException("The specified tuple 
position does not exist");
+               }
+               
+               // count nested fields before "pos".
+               for(int i = 0; i < pos; i++) {
+                       offset += types[i].getTotalFields() - 1; // this adds 
only something to offset if its a composite type.
+               }
+               
+               result.add(new FlatFieldDescriptor(offset + pos, types[pos]));
+       }
+       
        public <X> TypeInformation<X> getTypeAt(int pos) {
                if (pos < 0 || pos >= this.types.length) {
                        throw new IndexOutOfBoundsException();
@@ -115,4 +221,5 @@ public abstract class TupleTypeInfoBase<T> extends 
TypeInformation<T> implements
                bld.append('>');
                return bld.toString();
        }
+       
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/926f835a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
index 55f6b1f..bc92cdd 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
@@ -30,6 +30,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 
+import org.apache.avro.generic.GenericContainer;
 import org.apache.commons.lang3.Validate;
 import org.apache.flink.api.common.functions.CoGroupFunction;
 import org.apache.flink.api.common.functions.CrossFunction;
@@ -47,13 +48,19 @@ import 
org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.CompositeType;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.types.Value;
 import org.apache.flink.util.Collector;
 import org.apache.hadoop.io.Writable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
 
 public class TypeExtractor {
+       private static final Logger LOG = 
LoggerFactory.getLogger(TypeExtractor.class);
 
        // We need this to detect recursive types and not get caught
        // in an endless recursion
@@ -220,6 +227,29 @@ public class TypeExtractor {
                // get info from hierarchy
                return (TypeInformation<OUT>) 
createTypeInfoWithTypeHierarchy(typeHierarchy, returnType, in1Type, in2Type);
        }
+
+
+       /**
+        * @param curT : start type
+        * @return Type The immediate child of the top class
+        */
+       private Type recursivelyGetTypeHierarchy(ArrayList<Type> typeHierarchy, 
Type curT, Class<?> stopAtClass) {
+               while (!(curT instanceof ParameterizedType && ((Class<?>) 
((ParameterizedType) curT).getRawType()).equals(
+                               stopAtClass))
+                               && !(curT instanceof Class<?> && ((Class<?>) 
curT).equals(stopAtClass))) {
+                       typeHierarchy.add(curT);
+                       
+                       // parameterized type
+                       if (curT instanceof ParameterizedType) {
+                               curT = ((Class<?>) ((ParameterizedType) 
curT).getRawType()).getGenericSuperclass();
+                       }
+                       // class
+                       else {
+                               curT = ((Class<?>) curT).getGenericSuperclass();
+                       }
+               }
+               return curT;
+       }
        
        @SuppressWarnings({ "unchecked", "rawtypes" })
        private <IN1, IN2, OUT> TypeInformation<OUT> 
createTypeInfoWithTypeHierarchy(ArrayList<Type> typeHierarchy, Type t,
@@ -227,7 +257,7 @@ public class TypeExtractor {
                
                // check if type is a subclass of tuple
                if ((t instanceof Class<?> && 
Tuple.class.isAssignableFrom((Class<?>) t))
-                               || (t instanceof ParameterizedType && 
Tuple.class.isAssignableFrom((Class<?>) ((ParameterizedType) t).getRawType()))) 
{
+                       || (t instanceof ParameterizedType && 
Tuple.class.isAssignableFrom((Class<?>) ((ParameterizedType) t).getRawType()))) 
{
                        
                        Type curT = t;
                        
@@ -236,7 +266,7 @@ public class TypeExtractor {
                                throw new InvalidTypesException(
                                                "Usage of class Tuple as a type 
is not allowed. Use a concrete subclass (e.g. Tuple1, Tuple2, etc.) instead.");
                        }
-                       
+                                               
                        // go up the hierarchy until we reach immediate child 
of Tuple (with or without generics)
                        // collect the types while moving up for a later 
top-down 
                        while (!(curT instanceof ParameterizedType && 
((Class<?>) ((ParameterizedType) curT).getRawType()).getSuperclass().equals(
@@ -295,15 +325,23 @@ public class TypeExtractor {
                                }
                        }
                        
-                       // TODO: Check that type that extends Tuple does not 
have additional fields.
-                       // Right now, these fields are not be serialized by the 
TupleSerializer. 
-                       // We might want to add an ExtendedTupleSerializer for 
that. 
-                       
+                       Class<?> tAsClass = null;
                        if (t instanceof Class<?>) {
-                               return new TupleTypeInfo(((Class<? extends 
Tuple>) t), tupleSubTypes);
+                               tAsClass = (Class<?>) t;
                        } else if (t instanceof ParameterizedType) {
-                               return new TupleTypeInfo(((Class<? extends 
Tuple>) ((ParameterizedType) t).getRawType()), tupleSubTypes);
+                               tAsClass = (Class<? extends Tuple>) 
((ParameterizedType) t).getRawType();
                        }
+                       Preconditions.checkNotNull(tAsClass, "t has a 
unexpected type");
+                       // check if the class we assumed to be a Tuple so far 
is actually a pojo because it contains additional fields.
+                       // check for additional fields.
+                       int fieldCount = countFieldsInClass(tAsClass);
+                       if(fieldCount != tupleSubTypes.length) {
+                               // the class is not a real tuple because it 
contains additional fields. treat as a pojo
+                               return (TypeInformation<OUT>) 
analyzePojo(tAsClass, new ArrayList<Type>() ); // the typeHierarchy here should 
be sufficient, even though it stops at the Tuple.class.
+                       }
+                       
+                       return new TupleTypeInfo(tAsClass, tupleSubTypes);
+                       
                }
                // type depends on another type
                // e.g. class MyMapper<E> extends MapFunction<String, E>
@@ -361,16 +399,29 @@ public class TypeExtractor {
                }
                // objects with generics are treated as raw type
                else if (t instanceof ParameterizedType) {
-                       return privateGetForClass((Class<OUT>) 
((ParameterizedType) t).getRawType());
+                       return privateGetForClass((Class<OUT>) 
((ParameterizedType) t).getRawType(), new ArrayList<Type>()); // pass new type 
hierarchies here because
+                       // while creating the TH here, we assumed a tuple type.
                }
                // no tuple, no TypeVariable, no generic type
                else if (t instanceof Class) {
-                       return privateGetForClass((Class<OUT>) t);
+                       return privateGetForClass((Class<OUT>) t, new 
ArrayList<Type>());
                }
                
                throw new InvalidTypesException("Type Information could not be 
created.");
        }
        
+       private int countFieldsInClass(Class<?> clazz) {
+               int fieldCount = 0;
+               for(Field field : clazz.getFields()) { // get all fields
+                       if(     !Modifier.isStatic(field.getModifiers()) &&
+                               !Modifier.isTransient(field.getModifiers())
+                               ) {
+                               fieldCount++;
+                       }
+               }
+               return fieldCount;
+       }
+
        private <IN1, IN2> TypeInformation<?> 
createTypeInfoFromInputs(TypeVariable<?> returnTypeVar, ArrayList<Type> 
returnTypeHierarchy, 
                        TypeInformation<IN1> in1TypeInfo, TypeInformation<IN2> 
in2TypeInfo) {
 
@@ -383,7 +434,7 @@ public class TypeExtractor {
                else {
                        returnTypeVar = (TypeVariable<?>) matReturnTypeVar;
                }
-
+               
                TypeInformation<?> info = null;
                if (in1TypeInfo != null) {
                        // find the deepest type variable that describes the 
type of input 1
@@ -806,11 +857,11 @@ public class TypeExtractor {
        }
        
        public static <X> TypeInformation<X> getForClass(Class<X> clazz) {
-               return new TypeExtractor().privateGetForClass(clazz);
+               return new TypeExtractor().privateGetForClass(clazz, new 
ArrayList<Type>());
        }
        
        @SuppressWarnings("unchecked")
-       private <X> TypeInformation<X> privateGetForClass(Class<X> clazz) {
+       private <X> TypeInformation<X> privateGetForClass(Class<X> clazz, 
ArrayList<Type> typeHierarchy) {
                Validate.notNull(clazz);
                
                // check for abstract classes or interfaces
@@ -819,10 +870,8 @@ public class TypeExtractor {
                }
 
                if (clazz.equals(Object.class)) {
-                       // this will occur when trying to analyze POJOs that 
have generic, this
-                       // exception will be caught and a GenericTypeInfo will 
be created for the type.
-                       // at some point we might support this using Kryo
-                       throw new InvalidTypesException("Object is not a valid 
type.");
+                       // TODO (merging): better throw an exception here. the 
runtime does not support it yet
+                       return new GenericTypeInfo<X>(clazz);
                }
                
                // check for arrays
@@ -879,62 +928,135 @@ public class TypeExtractor {
                        // special case handling for Class, this should not be 
handled by the POJO logic
                        return new GenericTypeInfo<X>(clazz);
                }
+               if(GenericContainer.class.isAssignableFrom(clazz)) {
+                       // this is a type generated by Avro. GenericTypeInfo is 
able to handle this case because its using Avro.
+                       return new GenericTypeInfo<X>(clazz);
+               }
+               TypeInformation<X> pojoType = analyzePojo(clazz, typeHierarchy);
+               if (pojoType != null) {
+                       return pojoType;
+               }
 
-//             Disable POJO types for now (see 
https://mail-archives.apache.org/mod_mbox/incubator-flink-dev/201407.mbox/%3C53D96049.1060509%40cse.uta.edu%3E)
-//
-//             TypeInformation<X> pojoType =  analyzePojo(clazz);
-//             if (pojoType != null) {
-//                     return pojoType;
-//             }
 
                // return a generic type
                return new GenericTypeInfo<X>(clazz);
        }
+       
+       /**
+        * Checks if the given field is a valid pojo field:
+        *      - it is public
+        * OR
+        *  - there are getter and setter methods for the field.
+        *  
+        * @param f field to check
+        * @param clazz class of field
+        * @param typeHierarchy type hierarchy for materializing generic types
+        * @return
+        */
+       private boolean isValidPojoField(Field f, Class<?> clazz, 
ArrayList<Type> typeHierarchy) {
+               if(Modifier.isPublic(f.getModifiers())) {
+                       return true;
+               } else {
+                       boolean hasGetter = false, hasSetter = false;
+                       final String fieldNameLow = f.getName().toLowerCase();
+                       
+                       Type fieldType = f.getGenericType();
+                       TypeVariable<?> fieldTypeGeneric = null;
+                       if(fieldType instanceof TypeVariable) {
+                               fieldTypeGeneric = (TypeVariable<?>) fieldType;
+                               fieldType = 
materializeTypeVariable(typeHierarchy, (TypeVariable<?>)fieldType);
+                       }
+                       for(Method m : clazz.getMethods()) {
+                               // check for getter
+                               
+                               if(     // The name should be "get<FieldName>".
+                                       
m.getName().toLowerCase().contains("get"+fieldNameLow) &&
+                                       // no arguments for the getter
+                                       m.getParameterTypes().length == 0 &&
+                                       // return type is same as field type 
(or the generic variant of it)
+                                       m.getReturnType().equals( fieldType ) 
|| (fieldTypeGeneric != null && 
m.getGenericReturnType().equals(fieldTypeGeneric) )
+                               ) {
+                                       if(hasGetter) {
+                                               throw new 
IllegalStateException("Detected more than one getters");
+                                       }
+                                       hasGetter = true;
+                               }
+                               // check for setters
+                               if( 
m.getName().toLowerCase().contains("set"+fieldNameLow) &&
+                                               m.getParameterTypes().length == 
1 && // one parameter of the field's type
+                                               ( 
m.getParameterTypes()[0].equals( fieldType )  || (fieldTypeGeneric != null && 
m.getGenericParameterTypes()[0].equals(fieldTypeGeneric) ) )&&
+                                               // return type is void.
+                                               
m.getReturnType().equals(Void.TYPE)
+                               ) {
+                                       if(hasSetter) {
+                                               throw new 
IllegalStateException("Detected more than one getters");
+                                       }
+                                       hasSetter = true;
+                               }
+                       }
+                       if( hasGetter && hasSetter) {
+                               return true;
+                       } else {
+                               if(!hasGetter) {
+                                       LOG.warn("Class "+clazz+" does not 
contain a getter for field "+f.getName() );
+                               }
+                               if(!hasSetter) {
+                                       LOG.warn("Class "+clazz+" does not 
contain a setter for field "+f.getName() );
+                               }
+                               return false;
+                       }
+               }
+       }
 
-       @SuppressWarnings("unused")
-       private <X> TypeInformation<X> analyzePojo(Class<X> clazz) {
-               List<Field> fields = getAllDeclaredFields(clazz);
+       private <X> TypeInformation<X> analyzePojo(Class<X> clazz, 
ArrayList<Type> typeHierarchy) {
+               // try to create Type hierarchy, if the incoming one is empty.
+               if(typeHierarchy.size() == 0) {
+                       recursivelyGetTypeHierarchy(typeHierarchy, clazz, 
Object.class);
+               }
+               
+               List<Field> fields = 
removeNonObjectFields(getAllDeclaredFields(clazz));
                List<PojoField> pojoFields = new ArrayList<PojoField>();
                for (Field field : fields) {
+                       Type fieldType = field.getGenericType();
+                       if(!isValidPojoField(field, clazz, typeHierarchy)) {
+                               LOG.warn("Class "+clazz+" is not a valid POJO 
type");
+                               return null;
+                       }
                        try {
-                               if (!Modifier.isTransient(field.getModifiers()) 
&& !Modifier.isStatic(field.getModifiers())) {
-                                       pojoFields.add(new PojoField(field, 
privateCreateTypeInfo(field.getType())));
-                               }
+                               typeHierarchy.add(fieldType);
+                               pojoFields.add(new PojoField(field, 
createTypeInfoWithTypeHierarchy(typeHierarchy, fieldType, null, null) ));
                        } catch (InvalidTypesException e) {
-                               // If some of the fields cannot be analyzed, 
just return a generic type info
-                               // right now this happens when a field is an 
interface (collections are the prominent case here) or
-                               // when the POJO is generic, in which case the 
fields will have type Object.
-                               // We might fix that in the future when we use 
Kryo.
-                               return new GenericTypeInfo<X>(clazz);
+                               //pojoFields.add(new PojoField(field, new 
GenericTypeInfo( Object.class ))); // we need kryo to properly serialize this
+                               throw new InvalidTypesException("Flink is 
currently unable to serialize this type: "+fieldType+""
+                                               + "\nThe system is using the 
Avro serializer which is not able to handle all types.", e);
                        }
                }
 
-               PojoTypeInfo<X> pojoType = new PojoTypeInfo<X>(clazz, 
pojoFields);
+               CompositeType<X> pojoType = new PojoTypeInfo<X>(clazz, 
pojoFields);
 
+               //
+               // Validate the correctness of the pojo.
+               // returning "null" will result create a generic type 
information.
+               //
                List<Method> methods = getAllDeclaredMethods(clazz);
-               boolean containsReadObjectOrWriteObject = false;
                for (Method method : methods) {
                        if (method.getName().equals("readObject") || 
method.getName().equals("writeObject")) {
-                               containsReadObjectOrWriteObject = true;
-                               break;
+                               LOG.warn("Class "+clazz+" contains custom 
serialization methods we do not call.");
+                               return null;
                        }
                }
 
                // Try retrieving the default constructor, if it does not have 
one
                // we cannot use this because the serializer uses it.
-               boolean hasDefaultCtor = true;
                try {
                        clazz.getDeclaredConstructor();
                } catch (NoSuchMethodException e) {
-                       hasDefaultCtor = false;
+                       LOG.warn("Class "+clazz+" does not have a default 
constructor. You can not use it as a POJO");
+                       return null;
                }
-
-
-               if (!containsReadObjectOrWriteObject && hasDefaultCtor) {
-                       return pojoType;
-               }
-
-               return null;
+               
+               // everything is checked, we return the pojo
+               return pojoType;
        }
 
        // recursively determine all declared fields
@@ -950,6 +1072,19 @@ public class TypeExtractor {
                return result;
        }
 
+       /**
+        * Remove transient and static fields from a list of fields.
+        */
+       private static List<Field> removeNonObjectFields(List<Field> fields) {
+               List<Field> result = new ArrayList<Field>();
+               for(Field field: fields) {
+                       if (!Modifier.isTransient(field.getModifiers()) && 
!Modifier.isStatic(field.getModifiers())) {
+                               result.add(field);
+                       }
+               }
+               return result;
+       }
+       
        // recursively determine all declared methods
        private static List<Method> getAllDeclaredMethods(Class<?> clazz) {
                List<Method> result = new ArrayList<Method>();
@@ -976,6 +1111,11 @@ public class TypeExtractor {
                if (value instanceof Tuple) {
                        Tuple t = (Tuple) value;
                        int numFields = t.getArity();
+                       if(numFields != countFieldsInClass(value.getClass())) {
+                               // not a tuple since it has more fields. 
+                               return analyzePojo((Class<X>) value.getClass(), 
new ArrayList<Type>()); // we immediately call analyze Pojo here, because
+                               // there is currently no other type that can 
handle such a class.
+                       }
                        
                        TypeInformation<?>[] infos = new 
TypeInformation[numFields];
                        for (int i = 0; i < numFields; i++) {
@@ -988,10 +1128,9 @@ public class TypeExtractor {
                                
                                infos[i] = privateGetForObject(field);
                        }
-                       
                        return (TypeInformation<X>) new 
TupleTypeInfo(value.getClass(), infos);
                } else {
-                       return privateGetForClass((Class<X>) value.getClass());
+                       return privateGetForClass((Class<X>) value.getClass(), 
new ArrayList<Type>());
                }
        }
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/926f835a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeInfoParser.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeInfoParser.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeInfoParser.java
index 6894e5a..953b69c 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeInfoParser.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeInfoParser.java
@@ -87,7 +87,7 @@ public class TypeInfoParser {
                        }
                        return (TypeInformation<X>) parse(new 
StringBuilder(clearedString));
                } catch (Exception e) {
-                       throw new IllegalArgumentException("String could not be 
parsed: " + e.getMessage());
+                       throw new IllegalArgumentException("String could not be 
parsed: " + e.getMessage(), e);
                }
        }
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/926f835a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/ValueTypeInfo.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/ValueTypeInfo.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/ValueTypeInfo.java
index 5045b38..b3c25e4 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/ValueTypeInfo.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/ValueTypeInfo.java
@@ -53,6 +53,11 @@ public class ValueTypeInfo<T extends Value> extends 
TypeInformation<T> implement
        }
 
        @Override
+       public int getTotalFields() {
+               return 1;
+       }
+       
+       @Override
        public Class<T> getTypeClass() {
                return this.type;
        }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/926f835a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/WritableTypeInfo.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/WritableTypeInfo.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/WritableTypeInfo.java
index 19bcf0b..8c9e948 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/WritableTypeInfo.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/WritableTypeInfo.java
@@ -67,6 +67,11 @@ public class WritableTypeInfo<T extends Writable> extends 
TypeInformation<T> imp
        public int getArity() {
                return 1;
        }
+       
+       @Override
+       public int getTotalFields() {
+               return 1;
+       }
 
        @Override
        public Class<T> getTypeClass() {
@@ -88,6 +93,20 @@ public class WritableTypeInfo<T extends Writable> extends 
TypeInformation<T> imp
                return "WritableType<" + typeClass.getName() + ">";
        }       
        
+       @Override
+       public int hashCode() {
+               return typeClass.hashCode() ^ 0xd3a2646c;
+       }
+       
+       @Override
+       public boolean equals(Object obj) {
+               if (obj.getClass() == WritableTypeInfo.class) {
+                       return typeClass == ((WritableTypeInfo<?>) 
obj).typeClass;
+               } else {
+                       return false;
+               }
+       }
+       
        // 
--------------------------------------------------------------------------------------------
        
        static final <T extends Writable> TypeInformation<T> 
getWritableTypeInfo(Class<T> typeClass) {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/926f835a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/CopyableValueComparator.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/CopyableValueComparator.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/CopyableValueComparator.java
index 0b7890f..9b3b191 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/CopyableValueComparator.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/CopyableValueComparator.java
@@ -43,8 +43,6 @@ public class CopyableValueComparator<T extends 
CopyableValue<T> & Comparable<T>>
        
        private transient T tempReference;
 
-       private final Comparable<?>[] extractedKey = new Comparable[1];
-
        private final TypeComparator<?>[] comparators = new TypeComparator[] 
{this};
 
        public CopyableValueComparator(boolean ascending, Class<T> type) {
@@ -126,13 +124,13 @@ public class CopyableValueComparator<T extends 
CopyableValue<T> & Comparable<T>>
        }
 
        @Override
-       public Object[] extractKeys(T record) {
-               extractedKey[0] = record;
-               return extractedKey;
+       public int extractKeys(Object record, Object[] target, int index) {
+               target[index] = record;
+               return 1;
        }
 
        @Override
-       public TypeComparator<?>[] getComparators() {
+       public TypeComparator<?>[] getFlatComparators() {
                return comparators;
        }
        

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/926f835a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/GenericTypeComparator.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/GenericTypeComparator.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/GenericTypeComparator.java
index 66cbdf4..2d3ce39 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/GenericTypeComparator.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/GenericTypeComparator.java
@@ -53,9 +53,6 @@ public class GenericTypeComparator<T extends Comparable<T>> 
extends TypeComparat
        private transient Kryo kryo;
 
        @SuppressWarnings("rawtypes")
-       private final Comparable[] extractedKey = new Comparable[1];
-
-       @SuppressWarnings("rawtypes")
        private final TypeComparator[] comparators = new TypeComparator[] 
{this};
 
        // 
------------------------------------------------------------------------
@@ -171,14 +168,14 @@ public class GenericTypeComparator<T extends 
Comparable<T>> extends TypeComparat
        }
 
        @Override
-       public Object[] extractKeys(T record) {
-               extractedKey[0] = record;
-               return extractedKey;
+       public int extractKeys(Object record, Object[] target, int index) {
+               target[index] = record;
+               return 1;
        }
 
-       @Override
        @SuppressWarnings("rawtypes")
-       public TypeComparator[] getComparators() {
+       @Override
+       public TypeComparator[] getFlatComparators() {
                return comparators;
        }
 

Reply via email to