http://git-wip-us.apache.org/repos/asf/flink/blob/8ca853e0/flink-java/src/main/java/org/apache/flink/api/java/typeutils/EnumTypeInfo.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/EnumTypeInfo.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/EnumTypeInfo.java
index 024eb71..de59c36 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/EnumTypeInfo.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/EnumTypeInfo.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.api.java.typeutils;
 
+import com.google.common.base.Preconditions;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.typeinfo.AtomicType;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
@@ -38,12 +39,12 @@ public class EnumTypeInfo<T extends Enum<T>> extends 
TypeInformation<T> implemen
        private final Class<T> typeClass;
 
        public EnumTypeInfo(Class<T> typeClass) {
-               if (typeClass == null) {
-                       throw new NullPointerException();
-               }
+               Preconditions.checkNotNull(typeClass, "Enum type class must not 
be null.");
+
                if (!Enum.class.isAssignableFrom(typeClass) ) {
                        throw new IllegalArgumentException("EnumTypeInfo can 
only be used for subclasses of " + Enum.class.getName());
                }
+
                this.typeClass = typeClass;
        }
 
@@ -98,13 +99,22 @@ public class EnumTypeInfo<T extends Enum<T>> extends 
TypeInformation<T> implemen
        
        @Override
        public int hashCode() {
-               return typeClass.hashCode() ^ 0xd3a2646c;
+               return typeClass.hashCode();
        }
-       
+
+       @Override
+       public boolean canEqual(Object obj) {
+               return obj instanceof EnumTypeInfo;
+       }
+
        @Override
        public boolean equals(Object obj) {
                if (obj instanceof EnumTypeInfo) {
-                       return typeClass == ((EnumTypeInfo<?>) obj).typeClass;
+                       @SuppressWarnings("unchecked")
+                       EnumTypeInfo<T> enumTypeInfo = (EnumTypeInfo<T>) obj;
+
+                       return enumTypeInfo.canEqual(this) &&
+                               typeClass == enumTypeInfo.typeClass;
                } else {
                        return false;
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/8ca853e0/flink-java/src/main/java/org/apache/flink/api/java/typeutils/GenericTypeInfo.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/GenericTypeInfo.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/GenericTypeInfo.java
index 5caf8f2..7e7aa68 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/GenericTypeInfo.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/GenericTypeInfo.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.api.java.typeutils;
 
+import com.google.common.base.Preconditions;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.typeinfo.AtomicType;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
@@ -34,7 +35,7 @@ public class GenericTypeInfo<T> extends TypeInformation<T> 
implements AtomicType
        private final Class<T> typeClass;
 
        public GenericTypeInfo(Class<T> typeClass) {
-               this.typeClass = typeClass;
+               this.typeClass = Preconditions.checkNotNull(typeClass);
        }
 
        @Override
@@ -88,13 +89,21 @@ public class GenericTypeInfo<T> extends TypeInformation<T> 
implements AtomicType
 
        @Override
        public int hashCode() {
-               return typeClass.hashCode() ^ 0x165667b1;
+               return typeClass.hashCode();
+       }
+
+       @Override
+       public boolean canEqual(Object obj) {
+               return obj instanceof GenericTypeInfo;
        }
 
        @Override
        public boolean equals(Object obj) {
-               if (obj.getClass() == GenericTypeInfo.class) {
-                       return typeClass == ((GenericTypeInfo<?>) 
obj).typeClass;
+               if (obj instanceof GenericTypeInfo) {
+                       @SuppressWarnings("unchecked")
+                       GenericTypeInfo<T> genericTypeInfo = 
(GenericTypeInfo<T>) obj;
+
+                       return typeClass == genericTypeInfo.typeClass;
                } else {
                        return false;
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/8ca853e0/flink-java/src/main/java/org/apache/flink/api/java/typeutils/MissingTypeInfo.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/MissingTypeInfo.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/MissingTypeInfo.java
index 227c68c..1dd7f01 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/MissingTypeInfo.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/MissingTypeInfo.java
@@ -31,8 +31,8 @@ public class MissingTypeInfo extends 
TypeInformation<InvalidTypesException> {
 
        private static final long serialVersionUID = -4212082837126702723L;
        
-       private String functionName;
-       private InvalidTypesException typeException;
+       private final String functionName;
+       private final InvalidTypesException typeException;
 
        
        public MissingTypeInfo(String functionName) {
@@ -87,6 +87,34 @@ public class MissingTypeInfo extends 
TypeInformation<InvalidTypesException> {
        }
 
        @Override
+       public String toString() {
+               return getClass().getSimpleName() + "<" + functionName + ", " + 
typeException.getMessage() + ">";
+       }
+
+       @Override
+       public boolean equals(Object obj) {
+               if (obj instanceof MissingTypeInfo) {
+                       MissingTypeInfo missingTypeInfo = (MissingTypeInfo) obj;
+
+                       return missingTypeInfo.canEqual(this) &&
+                               
functionName.equals(missingTypeInfo.functionName) &&
+                               
typeException.equals(missingTypeInfo.typeException);
+               } else {
+                       return false;
+               }
+       }
+
+       @Override
+       public int hashCode() {
+               return 31 * functionName.hashCode() + typeException.hashCode();
+       }
+
+       @Override
+       public boolean canEqual(Object obj) {
+               return obj instanceof MissingTypeInfo;
+       }
+
+       @Override
        public int getTotalFields() {
                throw new UnsupportedOperationException("The missing type 
information cannot be used as a type information.");
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/8ca853e0/flink-java/src/main/java/org/apache/flink/api/java/typeutils/ObjectArrayTypeInfo.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/ObjectArrayTypeInfo.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/ObjectArrayTypeInfo.java
index 6806122..150c976 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/ObjectArrayTypeInfo.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/ObjectArrayTypeInfo.java
@@ -19,13 +19,9 @@
 package org.apache.flink.api.java.typeutils;
 
 import java.lang.reflect.Array;
-import java.lang.reflect.GenericArrayType;
-import java.lang.reflect.ParameterizedType;
-import java.lang.reflect.Type;
 
+import com.google.common.base.Preconditions;
 import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.functions.InvalidTypesException;
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.base.GenericArraySerializer;
@@ -34,21 +30,12 @@ public class ObjectArrayTypeInfo<T, C> extends 
TypeInformation<T> {
 
        private static final long serialVersionUID = 1L;
        
-       private final Type arrayType;
-       private final Type componentType;
+       private final Class<T> arrayType;
        private final TypeInformation<C> componentInfo;
 
-       @SuppressWarnings("unchecked")
-       private ObjectArrayTypeInfo(Type arrayType, Type componentType) {
-               this.arrayType = arrayType;
-               this.componentType = componentType;
-               this.componentInfo = (TypeInformation<C>) 
TypeExtractor.createTypeInfo(componentType);
-       }
-
-       private ObjectArrayTypeInfo(Type arrayType, Type componentType, 
TypeInformation<C> componentInfo) {
-               this.arrayType = arrayType;
-               this.componentType = componentType;
-               this.componentInfo = componentInfo;
+       private ObjectArrayTypeInfo(Class<T> arrayType, TypeInformation<C> 
componentInfo) {
+               this.arrayType = Preconditions.checkNotNull(arrayType);
+               this.componentInfo = Preconditions.checkNotNull(componentInfo);
        }
 
        // 
--------------------------------------------------------------------------------------------
@@ -76,29 +63,9 @@ public class ObjectArrayTypeInfo<T, C> extends 
TypeInformation<T> {
        @SuppressWarnings("unchecked")
        @Override
        public Class<T> getTypeClass() {
-               // if arrayType is a Class
-               if (arrayType instanceof Class) {
-                       return (Class<T>) arrayType;
-               }
-
-               // if arrayType is a GenericArrayType
-               Class<?> componentClass = (Class<?>) ((ParameterizedType) 
componentType).getRawType();
-
-               try {
-                       return (Class<T>) Class.forName("[L" + 
componentClass.getName() + ";");
-               } catch (ClassNotFoundException e) {
-                       throw new RuntimeException("Cannot create non-generic 
type class.", e);
-               }
-       }
-
-       public Type getType() {
                return arrayType;
        }
 
-       public Type getComponentType() {
-               return this.componentType;
-       }
-
        public TypeInformation<C> getComponentInfo() {
                return componentInfo;
        }
@@ -111,15 +78,9 @@ public class ObjectArrayTypeInfo<T, C> extends 
TypeInformation<T> {
        @SuppressWarnings("unchecked")
        @Override
        public TypeSerializer<T> createSerializer(ExecutionConfig 
executionConfig) {
-               // use raw type for serializer if generic array type
-               if (this.componentType instanceof GenericArrayType) {
-                       ParameterizedType paramType = (ParameterizedType) 
((GenericArrayType) this.componentType).getGenericComponentType();
-
-                       return (TypeSerializer<T>) new 
GenericArraySerializer<C>((Class<C>) paramType.getRawType(),
-                                       
this.componentInfo.createSerializer(executionConfig));
-               } else {
-                       return (TypeSerializer<T>) new 
GenericArraySerializer<C>((Class<C>) this.componentType, 
this.componentInfo.createSerializer(executionConfig));
-               }
+               return (TypeSerializer<T>) new GenericArraySerializer<C>(
+                       componentInfo.getTypeClass(),
+                       componentInfo.createSerializer(executionConfig));
        }
 
        @Override
@@ -128,38 +89,37 @@ public class ObjectArrayTypeInfo<T, C> extends 
TypeInformation<T> {
        }
 
        @Override
-       public boolean equals(Object o) {
-               if (this == o) {
-                       return true;
-               }
-               if (o == null || getClass() != o.getClass()) {
+       public boolean equals(Object obj) {
+               if (obj instanceof ObjectArrayTypeInfo) {
+                       @SuppressWarnings("unchecked")
+                       ObjectArrayTypeInfo<T, C> objectArrayTypeInfo = 
(ObjectArrayTypeInfo<T, C>)obj;
+
+                       return objectArrayTypeInfo.canEqual(this) &&
+                               arrayType == objectArrayTypeInfo.arrayType &&
+                               
componentInfo.equals(objectArrayTypeInfo.componentInfo);
+               } else {
                        return false;
                }
+       }
 
-               ObjectArrayTypeInfo<?, ?> that = (ObjectArrayTypeInfo<?, ?>) o;
-               return this.arrayType.equals(that.arrayType) && 
this.componentType.equals(that.componentType);
+       @Override
+       public boolean canEqual(Object obj) {
+               return obj instanceof ObjectArrayTypeInfo;
        }
 
        @Override
        public int hashCode() {
-               return 31 * this.arrayType.hashCode() + 
this.componentType.hashCode();
+               return 31 * this.arrayType.hashCode() + 
this.componentInfo.hashCode();
        }
 
        // 
--------------------------------------------------------------------------------------------
 
-       public static <T, C> ObjectArrayTypeInfo<T, C> getInfoFor(Type type, 
TypeInformation<C> componentInfo) {
-               // generic array type e.g. for Tuples
-               if (type instanceof GenericArrayType) {
-                       GenericArrayType genericArray = (GenericArrayType) type;
-                       return new ObjectArrayTypeInfo<T, C>(type, 
genericArray.getGenericComponentType(), componentInfo);
-               }
-               // for tuples without generics (e.g. generated by the 
TypeInformation parser)
-               // and multidimensional arrays (e.g. in scala)
-               else if (type instanceof Class<?> && ((Class<?>) type).isArray()
-                               && BasicTypeInfo.getInfoFor((Class<?>) type) == 
null) {
-                       return new ObjectArrayTypeInfo<T, C>(type, ((Class<?>) 
type).getComponentType(), componentInfo);
-               }
-               throw new InvalidTypesException("The given type is not a valid 
object array.");
+       public static <T, C> ObjectArrayTypeInfo<T, C> getInfoFor(Class<T> 
arrayClass, TypeInformation<C> componentInfo) {
+               Preconditions.checkNotNull(arrayClass);
+               Preconditions.checkNotNull(componentInfo);
+               Preconditions.checkArgument(arrayClass.isArray(), "Class " + 
arrayClass + " must be an array.");
+
+               return new ObjectArrayTypeInfo<T, C>(arrayClass, componentInfo);
        }
 
        /**
@@ -170,20 +130,12 @@ public class ObjectArrayTypeInfo<T, C> extends 
TypeInformation<T> {
         * This must be used in cases where the complete type of the array is 
not available as a
         * {@link java.lang.reflect.Type} or {@link java.lang.Class}.
         */
+       @SuppressWarnings("unchecked")
        public static <T, C> ObjectArrayTypeInfo<T, C> 
getInfoFor(TypeInformation<C> componentInfo) {
-               return new ObjectArrayTypeInfo<T, C>(
-                               Array.newInstance(componentInfo.getTypeClass(), 
0).getClass(),
-                               componentInfo.getTypeClass(),
-                               componentInfo);
-       }
+               Preconditions.checkNotNull(componentInfo);
 
-       @SuppressWarnings("unchecked")
-       public static <T, C> ObjectArrayTypeInfo<T, C> getInfoFor(Type type) {
-               // class type e.g. for POJOs
-               if (type instanceof Class<?> && ((Class<?>) type).isArray() && 
BasicTypeInfo.getInfoFor((Class<C>) type) == null) {
-                       Class<C> array = (Class<C>) type;
-                       return new ObjectArrayTypeInfo<T, C>(type, 
array.getComponentType());
-               }
-               throw new InvalidTypesException("The given type is not a valid 
object array.");
+               return new ObjectArrayTypeInfo<T, C>(
+                       
(Class<T>)Array.newInstance(componentInfo.getTypeClass(), 0).getClass(),
+                       componentInfo);
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8ca853e0/flink-java/src/main/java/org/apache/flink/api/java/typeutils/PojoField.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/PojoField.java 
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/PojoField.java
index 91a7a5e..4ad0ac2 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/PojoField.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/PojoField.java
@@ -23,16 +23,29 @@ import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
 import java.io.Serializable;
 import java.lang.reflect.Field;
+import java.util.Objects;
 
+import com.google.common.base.Preconditions;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 
 public class PojoField implements Serializable {
-       public transient Field field;
-       public TypeInformation<?> type;
+
+       private static final long serialVersionUID = 1975295846436559363L;
+
+       private transient Field field;
+       private final TypeInformation<?> type;
 
        public PojoField(Field field, TypeInformation<?> type) {
-               this.field = field;
-               this.type = type;
+               this.field = Preconditions.checkNotNull(field);
+               this.type = Preconditions.checkNotNull(type);
+       }
+
+       public Field getField() {
+               return field;
+       }
+
+       public TypeInformation<?> getTypeInformation() {
+               return type;
        }
 
        private void writeObject(ObjectOutputStream out)
@@ -68,4 +81,25 @@ public class PojoField implements Serializable {
        public String toString() {
                return "PojoField " + field.getDeclaringClass() + "." + 
field.getName() + " (" + type + ")";
        }
+
+       @Override
+       public boolean equals(Object obj) {
+               if (obj instanceof PojoField) {
+                       PojoField other = (PojoField) obj;
+
+                       return other.canEqual(this) && type.equals(other.type) 
&&
+                               Objects.equals(field, other.field);
+               } else {
+                       return false;
+               }
+       }
+
+       @Override
+       public int hashCode() {
+               return Objects.hash(field, type);
+       }
+
+       public boolean canEqual(Object obj) {
+               return obj instanceof PojoField;
+       }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/8ca853e0/flink-java/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java
index 273a907..f7e4e42 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java
@@ -22,12 +22,12 @@ import java.lang.reflect.Field;
 import java.lang.reflect.Modifier;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Collections;
 import java.util.Comparator;
 import java.util.List;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
+import com.google.common.base.Preconditions;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.CompositeType;
@@ -40,8 +40,6 @@ import 
org.apache.flink.api.java.operators.Keys.ExpressionKeys;
 
 import com.google.common.base.Joiner;
 import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 /**
  * TypeInformation for "Java Beans"-style types. Flink refers to them as POJOs,
@@ -59,8 +57,6 @@ public class PojoTypeInfo<T> extends CompositeType<T> {
        
        private static final long serialVersionUID = 1L;
 
-       private static final Logger LOG = 
LoggerFactory.getLogger(PojoTypeInfo.class);
-
        private final static String REGEX_FIELD = 
"[\\p{L}_\\$][\\p{L}\\p{Digit}_\\$]*";
        private final static String REGEX_NESTED_FIELDS = 
"("+REGEX_FIELD+")(\\.(.+))?";
        private final static String REGEX_NESTED_FIELDS_WILDCARD = 
REGEX_NESTED_FIELDS
@@ -70,31 +66,32 @@ public class PojoTypeInfo<T> extends CompositeType<T> {
        private static final Pattern PATTERN_NESTED_FIELDS = 
Pattern.compile(REGEX_NESTED_FIELDS);
        private static final Pattern PATTERN_NESTED_FIELDS_WILDCARD = 
Pattern.compile(REGEX_NESTED_FIELDS_WILDCARD);
 
-       private final Class<T> typeClass;
-
-       private PojoField[] fields;
+       private final PojoField[] fields;
        
-       private int totalFields;
+       private final int totalFields;
 
        public PojoTypeInfo(Class<T> typeClass, List<PojoField> fields) {
                super(typeClass);
-               this.typeClass = typeClass;
-               List<PojoField> tempFields = new ArrayList<PojoField>(fields);
-               Collections.sort(tempFields, new Comparator<PojoField>() {
+
+               
Preconditions.checkArgument(Modifier.isPublic(typeClass.getModifiers()),
+                               "POJO " + typeClass + " is not public");
+
+               this.fields = fields.toArray(new PojoField[fields.size()]);
+
+               Arrays.sort(this.fields, new Comparator<PojoField>() {
                        @Override
                        public int compare(PojoField o1, PojoField o2) {
-                               return 
o1.field.getName().compareTo(o2.field.getName());
+                               return 
o1.getField().getName().compareTo(o2.getField().getName());
                        }
                });
-               this.fields = tempFields.toArray(new 
PojoField[tempFields.size()]);
-               
-               // check if POJO is public
-               if(!Modifier.isPublic(typeClass.getModifiers())) {
-                       throw new RuntimeException("POJO "+typeClass+" is not 
public");
-               }
+
+               int counterFields = 0;
+
                for(PojoField field : fields) {
-                       totalFields += field.type.getTotalFields();
+                       counterFields += 
field.getTypeInformation().getTotalFields();
                }
+
+               totalFields = counterFields;
        }
 
        @Override
@@ -119,11 +116,6 @@ public class PojoTypeInfo<T> extends CompositeType<T> {
        }
 
        @Override
-       public Class<T> getTypeClass() {
-               return typeClass;
-       }
-
-       @Override
        public boolean isSortKeyType() {
                // Support for sorting POJOs that implement Comparable is not 
implemented yet.
                // Since the order of fields in a POJO type is not well 
defined, sorting on fields
@@ -145,12 +137,16 @@ public class PojoTypeInfo<T> extends CompositeType<T> {
                        // handle select all
                        int keyPosition = 0;
                        for(PojoField pField : fields) {
-                               if(pField.type instanceof CompositeType) {
-                                       CompositeType<?> cType = 
(CompositeType<?>)pField.type;
+                               if(pField.getTypeInformation() instanceof 
CompositeType) {
+                                       CompositeType<?> cType = 
(CompositeType<?>)pField.getTypeInformation();
                                        
cType.getFlatFields(String.valueOf(ExpressionKeys.SELECT_ALL_CHAR), offset + 
keyPosition, result);
                                        keyPosition += cType.getTotalFields()-1;
                                } else {
-                                       result.add(new 
NamedFlatFieldDescriptor(pField.field.getName(), offset + keyPosition, 
pField.type));
+                                       result.add(
+                                               new NamedFlatFieldDescriptor(
+                                                       
pField.getField().getName(),
+                                                       offset + keyPosition,
+                                                       
pField.getTypeInformation()));
                                }
                                keyPosition++;
                        }
@@ -163,9 +159,9 @@ public class PojoTypeInfo<T> extends CompositeType<T> {
                int fieldPos = -1;
                TypeInformation<?> fieldType = null;
                for (int i = 0; i < fields.length; i++) {
-                       if (fields[i].field.getName().equals(field)) {
+                       if (fields[i].getField().getName().equals(field)) {
                                fieldPos = i;
-                               fieldType = fields[i].type;
+                               fieldType = fields[i].getTypeInformation();
                                break;
                        }
                }
@@ -181,7 +177,6 @@ public class PojoTypeInfo<T> extends CompositeType<T> {
                                }
                                // add all fields of composite type
                                ((CompositeType<?>) 
fieldType).getFlatFields("*", offset, result);
-                               return;
                        } else {
                                // we found the field to add
                                // compute flat field position by adding 
skipped fields
@@ -190,8 +185,6 @@ public class PojoTypeInfo<T> extends CompositeType<T> {
                                        flatFieldPos += 
this.getTypeAt(i).getTotalFields();
                                }
                                result.add(new 
FlatFieldDescriptor(flatFieldPos, fieldType));
-                               // nothing left to do
-                               return;
                        }
                } else {
                        if(fieldType instanceof CompositeType<?>) {
@@ -200,8 +193,6 @@ public class PojoTypeInfo<T> extends CompositeType<T> {
                                        offset += 
this.getTypeAt(i).getTotalFields();
                                }
                                ((CompositeType<?>) 
fieldType).getFlatFields(tail, offset, result);
-                               // nothing left to do
-                               return;
                        } else {
                                throw new 
InvalidFieldReferenceException("Nested field expression \""+tail+"\" not 
possible on atomic type "+fieldType+".");
                        }
@@ -226,9 +217,9 @@ public class PojoTypeInfo<T> extends CompositeType<T> {
                int fieldPos = -1;
                TypeInformation<?> fieldType = null;
                for (int i = 0; i < fields.length; i++) {
-                       if (fields[i].field.getName().equals(field)) {
+                       if (fields[i].getField().getName().equals(field)) {
                                fieldPos = i;
-                               fieldType = fields[i].type;
+                               fieldType = fields[i].getTypeInformation();
                                break;
                        }
                }
@@ -255,10 +246,15 @@ public class PojoTypeInfo<T> extends CompositeType<T> {
                        throw new IndexOutOfBoundsException();
                }
                @SuppressWarnings("unchecked")
-               TypeInformation<X> typed = (TypeInformation<X>) 
fields[pos].type;
+               TypeInformation<X> typed = (TypeInformation<X>) 
fields[pos].getTypeInformation();
                return typed;
        }
 
+       @Override
+       protected TypeComparatorBuilder<T> createTypeComparatorBuilder() {
+               return new PojoTypeComparatorBuilder();
+       }
+
        // used for testing. Maybe use mockito here
        public PojoField getPojoFieldAt(int pos) {
                if (pos < 0 || pos >= this.fields.length) {
@@ -267,42 +263,10 @@ public class PojoTypeInfo<T> extends CompositeType<T> {
                return this.fields[pos];
        }
 
-       /**
-        * Comparator creation
-        */
-       private TypeComparator<?>[] fieldComparators;
-       private Field[] keyFields;
-       private int comparatorHelperIndex = 0;
-       @Override
-       protected void initializeNewComparator(int keyCount) {
-               fieldComparators = new TypeComparator<?>[keyCount];
-               keyFields = new Field[keyCount];
-               comparatorHelperIndex = 0;
-       }
-
-       @Override
-       protected void addCompareField(int fieldId, TypeComparator<?> 
comparator) {
-               fieldComparators[comparatorHelperIndex] = comparator;
-               keyFields[comparatorHelperIndex] = fields[fieldId].field;
-               comparatorHelperIndex++;
-       }
-
-       @Override
-       protected TypeComparator<T> getNewComparator(ExecutionConfig config) {
-               // first remove the null array fields
-               final Field[] finalKeyFields = Arrays.copyOf(keyFields, 
comparatorHelperIndex);
-               @SuppressWarnings("rawtypes")
-               final TypeComparator[] finalFieldComparators = 
Arrays.copyOf(fieldComparators, comparatorHelperIndex);
-               if(finalFieldComparators.length == 0 || finalKeyFields.length 
== 0 ||  finalFieldComparators.length != finalKeyFields.length) {
-                       throw new IllegalArgumentException("Pojo comparator 
creation has a bug");
-               }
-               return new PojoComparator<T>(finalKeyFields, 
finalFieldComparators, createSerializer(config), typeClass);
-       }
-
        public String[] getFieldNames() {
                String[] result = new String[fields.length];
                for (int i = 0; i < fields.length; i++) {
-                       result[i] = fields[i].field.getName();
+                       result[i] = fields[i].getField().getName();
                }
                return result;
        }
@@ -310,7 +274,7 @@ public class PojoTypeInfo<T> extends CompositeType<T> {
        @Override
        public int getFieldIndex(String fieldName) {
                for (int i = 0; i < fields.length; i++) {
-                       if (fields[i].field.getName().equals(fieldName)) {
+                       if (fields[i].getField().getName().equals(fieldName)) {
                                return i;
                        }
                }
@@ -320,46 +284,106 @@ public class PojoTypeInfo<T> extends CompositeType<T> {
        @Override
        public TypeSerializer<T> createSerializer(ExecutionConfig config) {
                if(config.isForceKryoEnabled()) {
-                       return new KryoSerializer<T>(this.typeClass, config);
+                       return new KryoSerializer<T>(getTypeClass(), config);
                }
                if(config.isForceAvroEnabled()) {
-                       return new AvroSerializer<T>(this.typeClass);
+                       return new AvroSerializer<T>(getTypeClass());
                }
 
                TypeSerializer<?>[] fieldSerializers = new 
TypeSerializer<?>[fields.length ];
                Field[] reflectiveFields = new Field[fields.length];
 
                for (int i = 0; i < fields.length; i++) {
-                       fieldSerializers[i] = 
fields[i].type.createSerializer(config);
-                       reflectiveFields[i] = fields[i].field;
+                       fieldSerializers[i] = 
fields[i].getTypeInformation().createSerializer(config);
+                       reflectiveFields[i] = fields[i].getField();
                }
 
-               return new PojoSerializer<T>(this.typeClass, fieldSerializers, 
reflectiveFields, config);
+               return new PojoSerializer<T>(getTypeClass(), fieldSerializers, 
reflectiveFields, config);
        }
-
-       // 
--------------------------------------------------------------------------------------------
        
        @Override
        public boolean equals(Object obj) {
-               return (obj instanceof PojoTypeInfo) && ((PojoTypeInfo<?>) 
obj).typeClass == this.typeClass;
+               if (obj instanceof PojoTypeInfo) {
+                       @SuppressWarnings("unchecked")
+                       PojoTypeInfo<T> pojoTypeInfo = (PojoTypeInfo<T>)obj;
+
+                       return pojoTypeInfo.canEqual(this) &&
+                               super.equals(pojoTypeInfo) &&
+                               Arrays.equals(fields, pojoTypeInfo.fields) &&
+                               totalFields == pojoTypeInfo.totalFields;
+               } else {
+                       return false;
+               }
        }
        
        @Override
        public int hashCode() {
-               return typeClass.hashCode() + 1387562934;
+               return 31 * (31 * Arrays.hashCode(fields) + totalFields) + 
super.hashCode();
+       }
+
+       @Override
+       public boolean canEqual(Object obj) {
+               return obj instanceof PojoTypeInfo;
        }
        
        @Override
        public String toString() {
                List<String> fieldStrings = new ArrayList<String>();
                for (PojoField field : fields) {
-                       fieldStrings.add(field.field.getName() + ": " + 
field.type.toString());
+                       fieldStrings.add(field.getField().getName() + ": " + 
field.getTypeInformation().toString());
                }
-               return "PojoType<" + typeClass.getName()
+               return "PojoType<" + getTypeClass().getName()
                                + ", fields = [" + Joiner.on(", 
").join(fieldStrings) + "]"
                                + ">";
        }
 
+       // 
--------------------------------------------------------------------------------------------
+
+       private class PojoTypeComparatorBuilder implements 
TypeComparatorBuilder<T> {
+
+               private ArrayList<TypeComparator> fieldComparators;
+               private ArrayList<Field> keyFields;
+
+               public PojoTypeComparatorBuilder() {
+                       fieldComparators = new ArrayList<TypeComparator>();
+                       keyFields = new ArrayList<Field>();
+               }
+
+
+               @Override
+               public void initializeTypeComparatorBuilder(int size) {
+                       fieldComparators.ensureCapacity(size);
+                       keyFields.ensureCapacity(size);
+               }
+
+               @Override
+               public void addComparatorField(int fieldId, TypeComparator<?> 
comparator) {
+                       fieldComparators.add(comparator);
+                       keyFields.add(fields[fieldId].getField());
+               }
+
+               @Override
+               public TypeComparator<T> createTypeComparator(ExecutionConfig 
config) {
+                       Preconditions.checkState(
+                               keyFields.size() > 0,
+                               "No keys were defined for the 
PojoTypeComparatorBuilder.");
+
+                       Preconditions.checkState(
+                               fieldComparators.size() > 0,
+                               "No type comparators were defined for the 
PojoTypeComparatorBuilder.");
+
+                       Preconditions.checkState(
+                               keyFields.size() == fieldComparators.size(),
+                               "Number of key fields and field comparators is 
not equal.");
+
+                       return new PojoComparator<T>(
+                               keyFields.toArray(new Field[keyFields.size()]),
+                               fieldComparators.toArray(new 
TypeComparator[fieldComparators.size()]),
+                               createSerializer(config),
+                               getTypeClass());
+               }
+       }
+
        public static class NamedFlatFieldDescriptor extends 
FlatFieldDescriptor {
 
                private String fieldName;

http://git-wip-us.apache.org/repos/asf/flink/blob/8ca853e0/flink-java/src/main/java/org/apache/flink/api/java/typeutils/RecordTypeInfo.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/RecordTypeInfo.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/RecordTypeInfo.java
index 9857eb6..e9ce102 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/RecordTypeInfo.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/RecordTypeInfo.java
@@ -70,12 +70,22 @@ public class RecordTypeInfo extends TypeInformation<Record> 
{
        
        @Override
        public int hashCode() {
-               return Record.class.hashCode() ^ 0x165667b1;
+               return Record.class.hashCode();
        }
-       
+
+       @Override
+       public boolean canEqual(Object obj) {
+               return obj instanceof RecordTypeInfo;
+       }
+
        @Override
        public boolean equals(Object obj) {
-               return obj.getClass() == RecordTypeInfo.class;
+               if (obj instanceof RecordTypeInfo) {
+                       RecordTypeInfo recordTypeInfo = (RecordTypeInfo) obj;
+                       return recordTypeInfo.canEqual(this);
+               } else {
+                       return false;
+               }
        }
        
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/8ca853e0/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfo.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfo.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfo.java
index 618b190..30710e5 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfo.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfo.java
@@ -18,8 +18,12 @@
 
 package org.apache.flink.api.java.typeutils;
 
+import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 
+import com.google.common.base.Preconditions;
+import com.google.common.primitives.Ints;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.InvalidTypesException;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
@@ -52,10 +56,13 @@ public final class TupleTypeInfo<T extends Tuple> extends 
TupleTypeInfoBase<T> {
 
        public TupleTypeInfo(Class<T> tupleType, TypeInformation<?>... types) {
                super(tupleType, types);
-               if (types == null || types.length > Tuple.MAX_ARITY) {
-                       throw new IllegalArgumentException();
-               }
+
+               Preconditions.checkArgument(
+                       types.length <= Tuple.MAX_ARITY,
+                       "The tuple type exceeds the maximum supported arity.");
+
                this.fieldNames = new String[types.length];
+
                for (int i = 0; i < types.length; i++) {
                        fieldNames[i] = "f" + i;
                }
@@ -78,7 +85,7 @@ public final class TupleTypeInfo<T extends Tuple> extends 
TupleTypeInfoBase<T> {
        @SuppressWarnings("unchecked")
        @Override
        public TupleSerializer<T> createSerializer(ExecutionConfig 
executionConfig) {
-               if (this.tupleType == Tuple0.class) {
+               if (getTypeClass() == Tuple0.class) {
                        return (TupleSerializer<T>) Tuple0Serializer.INSTANCE;
                }
 
@@ -91,48 +98,65 @@ public final class TupleTypeInfo<T extends Tuple> extends 
TupleTypeInfoBase<T> {
                
                return new TupleSerializer<T>(tupleClass, fieldSerializers);
        }
-       
-       /**
-        * Comparator creation
-        */
-       private TypeComparator<?>[] fieldComparators;
-       private int[] logicalKeyFields;
-       private int comparatorHelperIndex = 0;
-       
-       @Override
-       protected void initializeNewComparator(int localKeyCount) {
-               fieldComparators = new TypeComparator<?>[localKeyCount];
-               logicalKeyFields = new int[localKeyCount];
-               comparatorHelperIndex = 0;
-       }
 
        @Override
-       protected void addCompareField(int fieldId, TypeComparator<?> 
comparator) {
-               fieldComparators[comparatorHelperIndex] = comparator;
-               logicalKeyFields[comparatorHelperIndex] = fieldId;
-               comparatorHelperIndex++;
+       protected TypeComparatorBuilder<T> createTypeComparatorBuilder() {
+               return new TupleTypeComparatorBuilder();
        }
 
-       @Override
-       protected TypeComparator<T> getNewComparator(ExecutionConfig 
executionConfig) {
-               @SuppressWarnings("rawtypes")
-               final TypeComparator[] finalFieldComparators = 
Arrays.copyOf(fieldComparators, comparatorHelperIndex);
-               final int[] finalLogicalKeyFields = 
Arrays.copyOf(logicalKeyFields, comparatorHelperIndex);
-               //final TypeSerializer[] finalFieldSerializers = 
Arrays.copyOf(fieldSerializers, comparatorHelperIndex);
-               // create the serializers for the prefix up to highest key 
position
-               int maxKey = 0;
-               for(int key : finalLogicalKeyFields) {
-                       maxKey = Math.max(maxKey, key);
+       private class TupleTypeComparatorBuilder implements 
TypeComparatorBuilder<T> {
+
+               private final ArrayList<TypeComparator> fieldComparators = new 
ArrayList<TypeComparator>();
+               private final ArrayList<Integer> logicalKeyFields = new 
ArrayList<Integer>();
+
+               @Override
+               public void initializeTypeComparatorBuilder(int size) {
+                       fieldComparators.ensureCapacity(size);
+                       logicalKeyFields.ensureCapacity(size);
                }
-               TypeSerializer<?>[] fieldSerializers = new 
TypeSerializer<?>[maxKey + 1];
-               for (int i = 0; i <= maxKey; i++) {
-                       fieldSerializers[i] = 
types[i].createSerializer(executionConfig);
+
+               @Override
+               public void addComparatorField(int fieldId, TypeComparator<?> 
comparator) {
+                       fieldComparators.add(comparator);
+                       logicalKeyFields.add(fieldId);
                }
-               if(finalFieldComparators.length == 0 || 
finalLogicalKeyFields.length == 0 || fieldSerializers.length == 0 
-                               || finalFieldComparators.length != 
finalLogicalKeyFields.length) {
-                       throw new IllegalArgumentException("Tuple comparator 
creation has a bug");
+
+               @Override
+               public TypeComparator<T> createTypeComparator(ExecutionConfig 
config) {
+                       Preconditions.checkState(
+                               fieldComparators.size() > 0,
+                               "No field comparators were defined for the 
TupleTypeComparatorBuilder."
+                       );
+
+                       Preconditions.checkState(
+                               logicalKeyFields.size() > 0,
+                               "No key fields were defined for the 
TupleTypeComparatorBuilder."
+                       );
+
+                       Preconditions.checkState(
+                               fieldComparators.size() == 
logicalKeyFields.size(),
+                               "The number of field comparators and key fields 
is not equal."
+                       );
+
+                       final int maxKey = Collections.max(logicalKeyFields);
+
+                       Preconditions.checkState(
+                               maxKey >= 0,
+                               "The maximum key field must be greater or equal 
than 0."
+                       );
+
+                       TypeSerializer<?>[] fieldSerializers = new 
TypeSerializer<?>[maxKey + 1];
+
+                       for (int i = 0; i <= maxKey; i++) {
+                               fieldSerializers[i] = 
types[i].createSerializer(config);
+                       }
+
+                       return new TupleComparator<T>(
+                               Ints.toArray(logicalKeyFields),
+                               fieldComparators.toArray(new 
TypeComparator[fieldComparators.size()]),
+                               fieldSerializers
+                       );
                }
-               return new TupleComparator<T>(finalLogicalKeyFields, 
finalFieldComparators, fieldSerializers);
        }
        
        // 
--------------------------------------------------------------------------------------------
@@ -142,17 +166,22 @@ public final class TupleTypeInfo<T extends Tuple> extends 
TupleTypeInfoBase<T> {
                if (obj instanceof TupleTypeInfo) {
                        @SuppressWarnings("unchecked")
                        TupleTypeInfo<T> other = (TupleTypeInfo<T>) obj;
-                       return ((this.tupleType == null && other.tupleType == 
null) || this.tupleType.equals(other.tupleType)) &&
-                                       Arrays.deepEquals(this.types, 
other.types);
-                       
+                       return other.canEqual(this) &&
+                               super.equals(other) &&
+                               Arrays.equals(fieldNames, other.fieldNames);
                } else {
                        return false;
                }
        }
+
+       @Override
+       public boolean canEqual(Object obj) {
+               return obj instanceof TupleTypeInfo;
+       }
        
        @Override
        public int hashCode() {
-               return this.types.hashCode() ^ Arrays.deepHashCode(this.types);
+               return 31 * super.hashCode() + Arrays.hashCode(fieldNames);
        }
        
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/8ca853e0/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfoBase.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfoBase.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfoBase.java
index a2d937f..469476f 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfoBase.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfoBase.java
@@ -23,6 +23,7 @@ import java.util.List;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
+import com.google.common.base.Preconditions;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.CompositeType;
 import org.apache.flink.api.java.operators.Keys.ExpressionKeys;
@@ -45,17 +46,20 @@ public abstract class TupleTypeInfoBase<T> extends 
CompositeType<T> {
        
        protected final TypeInformation<?>[] types;
        
-       protected final Class<T> tupleType;
-
-       private int totalFields;
+       private final int totalFields;
 
        public TupleTypeInfoBase(Class<T> tupleType, TypeInformation<?>... 
types) {
                super(tupleType);
-               this.tupleType = tupleType;
-               this.types = types;
+
+               this.types = Preconditions.checkNotNull(types);
+
+               int fieldCounter = 0;
+
                for(TypeInformation<?> type : types) {
-                       totalFields += type.getTotalFields();
+                       fieldCounter += type.getTotalFields();
                }
+
+               totalFields = fieldCounter;
        }
 
        @Override
@@ -83,11 +87,6 @@ public abstract class TupleTypeInfoBase<T> extends 
CompositeType<T> {
        }
 
        @Override
-       public Class<T> getTypeClass() {
-               return tupleType;
-       }
-
-       @Override
        public void getFlatFields(String fieldExpression, int offset, 
List<FlatFieldDescriptor> result) {
 
                Matcher matcher = 
PATTERN_NESTED_FIELDS_WILDCARD.matcher(fieldExpression);
@@ -109,53 +108,49 @@ public abstract class TupleTypeInfoBase<T> extends 
CompositeType<T> {
                                }
                                keyPosition++;
                        }
-                       return;
-               }
+               } else {
+                       String fieldStr = matcher.group(1);
+                       Matcher fieldMatcher = PATTERN_FIELD.matcher(fieldStr);
 
-               String fieldStr = matcher.group(1);
-               Matcher fieldMatcher = PATTERN_FIELD.matcher(fieldStr);
-               if (!fieldMatcher.matches()) {
-                       throw new RuntimeException("Invalid matcher pattern");
-               }
-               field = fieldMatcher.group(2);
-               int fieldPos = Integer.valueOf(field);
+                       if (!fieldMatcher.matches()) {
+                               throw new RuntimeException("Invalid matcher 
pattern");
+                       }
 
-               if (fieldPos >= this.getArity()) {
-                       throw new InvalidFieldReferenceException("Tuple field 
expression \"" + fieldStr + "\" out of bounds of " + this.toString() + ".");
-               }
-               TypeInformation<?> fieldType = this.getTypeAt(fieldPos);
-               String tail = matcher.group(5);
-               if(tail == null) {
-                       if(fieldType instanceof CompositeType) {
-                               // forward offsets
-                               for(int i=0; i<fieldPos; i++) {
-                                       offset += 
this.getTypeAt(i).getTotalFields();
-                               }
-                               // add all fields of composite type
-                               ((CompositeType<?>) 
fieldType).getFlatFields("*", offset, result);
-                               return;
-                       } else {
-                               // we found the field to add
-                               // compute flat field position by adding 
skipped fields
-                               int flatFieldPos = offset;
-                               for(int i=0; i<fieldPos; i++) {
-                                       flatFieldPos += 
this.getTypeAt(i).getTotalFields();
-                               }
-                               result.add(new 
FlatFieldDescriptor(flatFieldPos, fieldType));
-                               // nothing left to do
-                               return;
+                       field = fieldMatcher.group(2);
+                       int fieldPos = Integer.valueOf(field);
+
+                       if (fieldPos >= this.getArity()) {
+                               throw new InvalidFieldReferenceException("Tuple 
field expression \"" + fieldStr + "\" out of bounds of " + this.toString() + 
".");
                        }
-               } else {
-                       if(fieldType instanceof CompositeType<?>) {
-                               // forward offset
-                               for(int i=0; i<fieldPos; i++) {
-                                       offset += 
this.getTypeAt(i).getTotalFields();
+                       TypeInformation<?> fieldType = this.getTypeAt(fieldPos);
+                       String tail = matcher.group(5);
+                       if (tail == null) {
+                               if (fieldType instanceof CompositeType) {
+                                       // forward offsets
+                                       for (int i = 0; i < fieldPos; i++) {
+                                               offset += 
this.getTypeAt(i).getTotalFields();
+                                       }
+                                       // add all fields of composite type
+                                       ((CompositeType<?>) 
fieldType).getFlatFields("*", offset, result);
+                               } else {
+                                       // we found the field to add
+                                       // compute flat field position by 
adding skipped fields
+                                       int flatFieldPos = offset;
+                                       for (int i = 0; i < fieldPos; i++) {
+                                               flatFieldPos += 
this.getTypeAt(i).getTotalFields();
+                                       }
+                                       result.add(new 
FlatFieldDescriptor(flatFieldPos, fieldType));
                                }
-                               ((CompositeType<?>) 
fieldType).getFlatFields(tail, offset, result);
-                               // nothing left to do
-                               return;
                        } else {
-                               throw new 
InvalidFieldReferenceException("Nested field expression \""+tail+"\" not 
possible on atomic type "+fieldType+".");
+                               if (fieldType instanceof CompositeType<?>) {
+                                       // forward offset
+                                       for (int i = 0; i < fieldPos; i++) {
+                                               offset += 
this.getTypeAt(i).getTotalFields();
+                                       }
+                                       ((CompositeType<?>) 
fieldType).getFlatFields(tail, offset, result);
+                               } else {
+                                       throw new 
InvalidFieldReferenceException("Nested field expression \"" + tail + "\" not 
possible on atomic type " + fieldType + ".");
+                               }
                        }
                }
        }
@@ -213,17 +208,24 @@ public abstract class TupleTypeInfoBase<T> extends 
CompositeType<T> {
                if (obj instanceof TupleTypeInfoBase) {
                        @SuppressWarnings("unchecked")
                        TupleTypeInfoBase<T> other = (TupleTypeInfoBase<T>) obj;
-                       return ((this.tupleType == null && other.tupleType == 
null) || this.tupleType.equals(other.tupleType)) &&
-                                       Arrays.deepEquals(this.types, 
other.types);
-                       
+
+                       return other.canEqual(this) &&
+                               super.equals(other) &&
+                               Arrays.equals(types, other.types) &&
+                               totalFields == other.totalFields;
                } else {
                        return false;
                }
        }
+
+       @Override
+       public boolean canEqual(Object obj) {
+               return obj instanceof TupleTypeInfoBase;
+       }
        
        @Override
        public int hashCode() {
-               return this.types.hashCode() ^ Arrays.deepHashCode(this.types);
+               return 31 * (31 * super.hashCode() + Arrays.hashCode(types)) + 
totalFields;
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/8ca853e0/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
index 8f5d599..0196b5d 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
@@ -526,12 +526,33 @@ public class TypeExtractor {
                                } catch (ClassNotFoundException e) {
                                        throw new InvalidTypesException("Could 
not convert GenericArrayType to Class.");
                                }
+
                                return getForClass(classArray);
+                       } else {
+                               TypeInformation<?> componentInfo = 
createTypeInfoWithTypeHierarchy(
+                                       typeHierarchy,
+                                       genericArray.getGenericComponentType(),
+                                       in1Type,
+                                       in2Type);
+
+                               Class<OUT> classArray;
+
+                               try {
+                                       String componentClassName = 
componentInfo.getTypeClass().getName();
+                                       String resultingClassName;
+
+                                       if (componentClassName.startsWith("[")) 
{
+                                               resultingClassName = "[" + 
componentClassName;
+                                       } else {
+                                               resultingClassName = "[L" + 
componentClassName + ";";
+                                       }
+                                       classArray = (Class<OUT>) 
Class.forName(resultingClassName);
+                               } catch (ClassNotFoundException e) {
+                                       throw new InvalidTypesException("Could 
not convert GenericArrayType to Class.");
+                               }
+
+                               return 
ObjectArrayTypeInfo.getInfoFor(classArray, componentInfo);
                        }
-                       
-                       TypeInformation<?> componentInfo = 
createTypeInfoWithTypeHierarchy(typeHierarchy, 
genericArray.getGenericComponentType(),
-                                       in1Type, in2Type);
-                       return ObjectArrayTypeInfo.getInfoFor(t, componentInfo);
                }
                // objects with generics are treated as Class first
                else if (t instanceof ParameterizedType) {
@@ -1188,7 +1209,13 @@ public class TypeExtractor {
                        
                        // object arrays
                        else {
-                               return ObjectArrayTypeInfo.getInfoFor(clazz);
+                               TypeInformation<?> componentTypeInfo = 
createTypeInfoWithTypeHierarchy(
+                                       typeHierarchy,
+                                       clazz.getComponentType(),
+                                       in1Type,
+                                       in2Type);
+
+                               return ObjectArrayTypeInfo.getInfoFor(clazz, 
componentTypeInfo);
                        }
                }
                
@@ -1481,8 +1508,8 @@ public class TypeExtractor {
        private static TypeInformation<?> getTypeOfPojoField(TypeInformation<?> 
pojoInfo, Field field) {
                for (int j = 0; j < pojoInfo.getArity(); j++) {
                        PojoField pf = ((PojoTypeInfo<?>) 
pojoInfo).getPojoFieldAt(j);
-                       if (pf.field.getName().equals(field.getName())) {
-                               return pf.type;
+                       if (pf.getField().getName().equals(field.getName())) {
+                               return pf.getTypeInformation();
                        }
                }
                return null;

http://git-wip-us.apache.org/repos/asf/flink/blob/8ca853e0/flink-java/src/main/java/org/apache/flink/api/java/typeutils/ValueTypeInfo.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/ValueTypeInfo.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/ValueTypeInfo.java
index e61acd8..0b4823e 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/ValueTypeInfo.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/ValueTypeInfo.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.api.java.typeutils;
 
+import com.google.common.base.Preconditions;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.InvalidTypesException;
 import org.apache.flink.api.common.typeinfo.AtomicType;
@@ -52,17 +53,13 @@ public class ValueTypeInfo<T extends Value> extends 
TypeInformation<T> implement
        private static final long serialVersionUID = 1L;
        
        private final Class<T> type;
-
        
        public ValueTypeInfo(Class<T> type) {
-               if (type == null) {
-                       throw new NullPointerException();
-               }
-               if (!Value.class.isAssignableFrom(type) && 
!type.equals(Value.class)) {
-                       throw new IllegalArgumentException("ValueTypeInfo can 
only be used for subclasses of " + Value.class.getName());
-               }
-               
-               this.type = type;
+               this.type = Preconditions.checkNotNull(type);
+
+               Preconditions.checkArgument(
+                       Value.class.isAssignableFrom(type) || 
type.equals(Value.class),
+                       "ValueTypeInfo can only be used for subclasses of " + 
Value.class.getName());
        }
        
        @Override
@@ -136,17 +133,26 @@ public class ValueTypeInfo<T extends Value> extends 
TypeInformation<T> implement
        
        @Override
        public int hashCode() {
-               return this.type.hashCode() ^ 0xd3a2646c;
+               return this.type.hashCode();
        }
        
        @Override
        public boolean equals(Object obj) {
-               if (obj.getClass() == ValueTypeInfo.class) {
-                       return type == ((ValueTypeInfo<?>) obj).type;
+               if (obj instanceof ValueTypeInfo) {
+                       @SuppressWarnings("unchecked")
+                       ValueTypeInfo<T> valueTypeInfo = (ValueTypeInfo<T>) obj;
+
+                       return valueTypeInfo.canEqual(this) &&
+                               type == valueTypeInfo.type;
                } else {
                        return false;
                }
        }
+
+       @Override
+       public boolean canEqual(Object obj) {
+               return obj instanceof ValueTypeInfo;
+       }
        
        @Override
        public String toString() {
@@ -155,7 +161,7 @@ public class ValueTypeInfo<T extends Value> extends 
TypeInformation<T> implement
        
        // 
--------------------------------------------------------------------------------------------
        
-       static final <X extends Value> TypeInformation<X> 
getValueTypeInfo(Class<X> typeClass) {
+       static <X extends Value> TypeInformation<X> getValueTypeInfo(Class<X> 
typeClass) {
                if (Value.class.isAssignableFrom(typeClass) && 
!typeClass.equals(Value.class)) {
                        return new ValueTypeInfo<X>(typeClass);
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/8ca853e0/flink-java/src/main/java/org/apache/flink/api/java/typeutils/WritableTypeInfo.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/WritableTypeInfo.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/WritableTypeInfo.java
index a91b888..6c140d9 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/WritableTypeInfo.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/WritableTypeInfo.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.api.java.typeutils;
 
+import com.google.common.base.Preconditions;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.InvalidTypesException;
 import org.apache.flink.api.common.typeinfo.AtomicType;
@@ -41,13 +42,11 @@ public class WritableTypeInfo<T extends Writable> extends 
TypeInformation<T> imp
        private final Class<T> typeClass;
        
        public WritableTypeInfo(Class<T> typeClass) {
-               if (typeClass == null) {
-                       throw new NullPointerException();
-               }
-               if (!Writable.class.isAssignableFrom(typeClass) || typeClass == 
Writable.class) {
-                       throw new IllegalArgumentException("WritableTypeInfo 
can only be used for subclasses of " + Writable.class.getName());
-               }
-               this.typeClass = typeClass;
+               this.typeClass = Preconditions.checkNotNull(typeClass);
+
+               Preconditions.checkArgument(
+                       Writable.class.isAssignableFrom(typeClass) && 
!typeClass.equals(Writable.class),
+                       "WritableTypeInfo can only be used for subclasses of " 
+ Writable.class.getName());
        }
 
        @SuppressWarnings({ "rawtypes", "unchecked" })
@@ -104,12 +103,26 @@ public class WritableTypeInfo<T extends Writable> extends 
TypeInformation<T> imp
        
        @Override
        public int hashCode() {
-               return typeClass.hashCode() ^ 0xd3a2646c;
+               return typeClass.hashCode();
        }
        
        @Override
        public boolean equals(Object obj) {
-               return obj.getClass() == WritableTypeInfo.class && typeClass == 
((WritableTypeInfo<?>) obj).typeClass;
+               if (obj instanceof WritableTypeInfo) {
+                       @SuppressWarnings("unchecked")
+                       WritableTypeInfo<T> writableTypeInfo = 
(WritableTypeInfo<T>) obj;
+
+                       return writableTypeInfo.canEqual(this) &&
+                               typeClass == writableTypeInfo.typeClass;
+
+               } else {
+                       return false;
+               }
+       }
+
+       @Override
+       public boolean canEqual(Object obj) {
+               return obj instanceof WritableTypeInfo;
        }
        
        // 
--------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/8ca853e0/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/AvroSerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/AvroSerializer.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/AvroSerializer.java
index 34dc500..26bf4ce 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/AvroSerializer.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/AvroSerializer.java
@@ -25,6 +25,7 @@ import java.io.IOException;
 import com.esotericsoftware.kryo.KryoException;
 import com.esotericsoftware.kryo.io.Input;
 import com.esotericsoftware.kryo.io.Output;
+import com.google.common.base.Preconditions;
 import org.apache.avro.generic.GenericData;
 import org.apache.avro.reflect.ReflectDatumReader;
 import org.apache.avro.reflect.ReflectDatumWriter;
@@ -69,14 +70,10 @@ public final class AvroSerializer<T> extends 
TypeSerializer<T> {
        }
        
        public AvroSerializer(Class<T> type, Class<? extends T> 
typeToInstantiate) {
-               if (type == null || typeToInstantiate == null) {
-                       throw new NullPointerException();
-               }
+               this.type = Preconditions.checkNotNull(type);
+               this.typeToInstantiate = 
Preconditions.checkNotNull(typeToInstantiate);
                
                InstantiationUtil.checkForInstantiation(typeToInstantiate);
-               
-               this.type = type;
-               this.typeToInstantiate = typeToInstantiate;
        }
 
        // 
--------------------------------------------------------------------------------------------
@@ -192,16 +189,25 @@ public final class AvroSerializer<T> extends 
TypeSerializer<T> {
        
        @Override
        public int hashCode() {
-               return 0x42fba55c + this.type.hashCode() + 
this.typeToInstantiate.hashCode();
+               return 31 * this.type.hashCode() + 
this.typeToInstantiate.hashCode();
        }
        
        @Override
        public boolean equals(Object obj) {
-               if (obj.getClass() == AvroSerializer.class) {
-                       AvroSerializer<?> other = (AvroSerializer<?>) obj;
-                       return this.type == other.type && 
this.typeToInstantiate == other.typeToInstantiate;
+               if (obj instanceof AvroSerializer) {
+                       @SuppressWarnings("unchecked")
+                       AvroSerializer<T> avroSerializer = (AvroSerializer<T>) 
obj;
+
+                       return avroSerializer.canEqual(this) &&
+                               type == avroSerializer.type &&
+                               typeToInstantiate == 
avroSerializer.typeToInstantiate;
                } else {
                        return false;
                }
        }
+
+       @Override
+       public boolean canEqual(Object obj) {
+               return obj instanceof AvroSerializer;
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8ca853e0/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/CopyableValueSerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/CopyableValueSerializer.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/CopyableValueSerializer.java
index 193d495..9e46f27 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/CopyableValueSerializer.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/CopyableValueSerializer.java
@@ -20,6 +20,7 @@ package org.apache.flink.api.java.typeutils.runtime;
 
 import java.io.IOException;
 
+import com.google.common.base.Preconditions;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
@@ -38,7 +39,7 @@ public class CopyableValueSerializer<T extends 
CopyableValue<T>> extends TypeSer
        
        
        public CopyableValueSerializer(Class<T> valueClass) {
-               this.valueClass = valueClass;
+               this.valueClass = Preconditions.checkNotNull(valueClass);
        }
 
        @Override
@@ -105,16 +106,24 @@ public class CopyableValueSerializer<T extends 
CopyableValue<T>> extends TypeSer
        
        @Override
        public int hashCode() {
-               return this.valueClass.hashCode() + 9231;
+               return this.valueClass.hashCode();
        }
        
        @Override
        public boolean equals(Object obj) {
-               if (obj.getClass() == CopyableValueSerializer.class) {
-                       CopyableValueSerializer<?> other = 
(CopyableValueSerializer<?>) obj;
-                       return this.valueClass == other.valueClass;
+               if (obj instanceof CopyableValueSerializer) {
+                       @SuppressWarnings("unchecked")
+                       CopyableValueSerializer<T> copyableValueSerializer = 
(CopyableValueSerializer<T>) obj;
+
+                       return copyableValueSerializer.canEqual(this) &&
+                               valueClass == 
copyableValueSerializer.valueClass;
                } else {
                        return false;
                }
        }
+
+       @Override
+       public boolean canEqual(Object obj) {
+               return obj instanceof CopyableValueSerializer;
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8ca853e0/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java
index 5d4553d..de24956 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java
@@ -30,7 +30,9 @@ import java.util.LinkedHashMap;
 import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 
+import com.google.common.base.Preconditions;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
@@ -51,33 +53,33 @@ public final class PojoSerializer<T> extends 
TypeSerializer<T> {
 
        private final Class<T> clazz;
 
-       private TypeSerializer<Object>[] fieldSerializers;
+       private final TypeSerializer<Object>[] fieldSerializers;
 
-       // We need to handle these ourselves in writeObject()/readObject()
-       private transient Field[] fields;
-
-       private int numFields;
+       private final int numFields;
 
-       private transient Map<Class<?>, TypeSerializer<?>> 
subclassSerializerCache;
-       private transient ClassLoader cl;
+       private final Map<Class<?>, Integer> registeredClasses;
 
-       private Map<Class<?>, Integer> registeredClasses;
-
-       private TypeSerializer<?>[] registeredSerializers;
+       private final TypeSerializer<?>[] registeredSerializers;
 
        private final ExecutionConfig executionConfig;
 
+       private transient Map<Class<?>, TypeSerializer<?>> 
subclassSerializerCache;
+       private transient ClassLoader cl;
+       // We need to handle these ourselves in writeObject()/readObject()
+       private transient Field[] fields;
+
        @SuppressWarnings("unchecked")
        public PojoSerializer(
                        Class<T> clazz,
                        TypeSerializer<?>[] fieldSerializers,
                        Field[] fields,
                        ExecutionConfig executionConfig) {
-               this.clazz = clazz;
-               this.fieldSerializers = (TypeSerializer<Object>[]) 
fieldSerializers;
-               this.fields = fields;
+
+               this.clazz = Preconditions.checkNotNull(clazz);
+               this.fieldSerializers = (TypeSerializer<Object>[]) 
Preconditions.checkNotNull(fieldSerializers);
+               this.fields = Preconditions.checkNotNull(fields);
                this.numFields = fieldSerializers.length;
-               this.executionConfig = executionConfig;
+               this.executionConfig = 
Preconditions.checkNotNull(executionConfig);
 
                LinkedHashSet<Class<?>> registeredPojoTypes = 
executionConfig.getRegisteredPojoTypes();
 
@@ -563,23 +565,28 @@ public final class PojoSerializer<T> extends 
TypeSerializer<T> {
        
        @Override
        public int hashCode() {
-               int hashCode = numFields * 47;
-               for (TypeSerializer<?> ser : this.fieldSerializers) {
-                       hashCode = (hashCode << 7) | (hashCode >>> -7);
-                       hashCode += ser.hashCode();
-               }
-               return hashCode;
+               return 31 * (31 * Arrays.hashCode(fieldSerializers) + 
Arrays.hashCode(registeredSerializers)) +
+                       Objects.hash(clazz, numFields, registeredClasses);
        }
        
        @Override
        public boolean equals(Object obj) {
-               if (obj != null && obj instanceof PojoSerializer) {
-                       PojoSerializer<?> otherTS = (PojoSerializer<?>) obj;
-                       return (otherTS.clazz == this.clazz) &&
-                                       
Arrays.deepEquals(this.fieldSerializers, otherTS.fieldSerializers);
-               }
-               else {
+               if (obj instanceof PojoSerializer) {
+                       PojoSerializer<?> other = (PojoSerializer<?>) obj;
+
+                       return other.canEqual(this) &&
+                               clazz == other.clazz &&
+                               Arrays.equals(fieldSerializers, 
other.fieldSerializers) &&
+                               Arrays.equals(registeredSerializers, 
other.registeredSerializers) &&
+                               numFields == other.numFields &&
+                               
registeredClasses.equals(other.registeredClasses);
+               } else {
                        return false;
                }
        }
+
+       @Override
+       public boolean canEqual(Object obj) {
+               return obj instanceof PojoSerializer;
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8ca853e0/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/Tuple0Serializer.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/Tuple0Serializer.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/Tuple0Serializer.java
index 246cecf..a06ff1a 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/Tuple0Serializer.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/Tuple0Serializer.java
@@ -13,6 +13,7 @@
 package org.apache.flink.api.java.typeutils.runtime;
 
 import java.io.IOException;
+
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.tuple.Tuple0;
 import org.apache.flink.core.memory.DataInputView;
@@ -94,12 +95,23 @@ public class Tuple0Serializer extends 
TupleSerializer<Tuple0> {
 
        @Override
        public int hashCode() {
-               return 1837461876;
+               return Tuple0Serializer.class.hashCode();
        }
 
        @Override
        public boolean equals(Object obj) {
-               return obj == this || obj instanceof Tuple0Serializer;
+               if (obj instanceof Tuple0Serializer) {
+                       Tuple0Serializer other = (Tuple0Serializer) obj;
+
+                       return other.canEqual(this);
+               } else {
+                       return false;
+               }
+       }
+
+       @Override
+       public boolean canEqual(Object obj) {
+               return obj instanceof Tuple0Serializer;
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/8ca853e0/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerBase.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerBase.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerBase.java
index f041736..bf3c7a1 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerBase.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerBase.java
@@ -18,12 +18,14 @@
 
 package org.apache.flink.api.java.typeutils.runtime;
 
+import com.google.common.base.Preconditions;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 
 import java.io.IOException;
 import java.util.Arrays;
+import java.util.Objects;
 
 
 public abstract class TupleSerializerBase<T> extends TypeSerializer<T> {
@@ -32,14 +34,14 @@ public abstract class TupleSerializerBase<T> extends 
TypeSerializer<T> {
 
        protected final Class<T> tupleClass;
 
-       protected TypeSerializer<Object>[] fieldSerializers;
+       protected final TypeSerializer<Object>[] fieldSerializers;
 
        protected final int arity;
 
        @SuppressWarnings("unchecked")
        public TupleSerializerBase(Class<T> tupleClass, TypeSerializer<?>[] 
fieldSerializers) {
-               this.tupleClass = tupleClass;
-               this.fieldSerializers = (TypeSerializer<Object>[]) 
fieldSerializers;
+               this.tupleClass = Preconditions.checkNotNull(tupleClass);
+               this.fieldSerializers = (TypeSerializer<Object>[]) 
Preconditions.checkNotNull(fieldSerializers);
                this.arity = fieldSerializers.length;
        }
        
@@ -74,23 +76,25 @@ public abstract class TupleSerializerBase<T> extends 
TypeSerializer<T> {
        
        @Override
        public int hashCode() {
-               int hashCode = arity * 47;
-               for (TypeSerializer<?> ser : this.fieldSerializers) {
-                       hashCode = (hashCode << 7) | (hashCode >>> -7);
-                       hashCode += ser.hashCode();
-               }
-               return hashCode;
+               return 31 * Arrays.hashCode(fieldSerializers) + 
Objects.hash(tupleClass, arity);
        }
        
        @Override
        public boolean equals(Object obj) {
-               if (obj != null && obj instanceof TupleSerializerBase) {
-                       TupleSerializerBase<?> otherTS = 
(TupleSerializerBase<?>) obj;
-                       return (otherTS.tupleClass == this.tupleClass) && 
-                                       
Arrays.deepEquals(this.fieldSerializers, otherTS.fieldSerializers);
-               }
-               else {
+               if (obj instanceof TupleSerializerBase) {
+                       TupleSerializerBase<?> other = (TupleSerializerBase<?>) 
obj;
+
+                       return other.canEqual(this) &&
+                               tupleClass == other.tupleClass &&
+                               Arrays.equals(fieldSerializers, 
other.fieldSerializers) &&
+                               arity == other.arity;
+               } else {
                        return false;
                }
        }
+
+       @Override
+       public boolean canEqual(Object obj) {
+               return obj instanceof TupleSerializerBase;
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8ca853e0/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/ValueSerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/ValueSerializer.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/ValueSerializer.java
index ad1b0f0..179ef19 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/ValueSerializer.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/ValueSerializer.java
@@ -20,6 +20,7 @@ package org.apache.flink.api.java.typeutils.runtime;
 
 import java.io.IOException;
 
+import com.google.common.base.Preconditions;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
@@ -47,11 +48,7 @@ public class ValueSerializer<T extends Value> extends 
TypeSerializer<T> {
        // 
--------------------------------------------------------------------------------------------
        
        public ValueSerializer(Class<T> type) {
-               if (type == null) {
-                       throw new NullPointerException();
-               }
-               
-               this.type = type;
+               this.type = Preconditions.checkNotNull(type);
        }
 
        // 
--------------------------------------------------------------------------------------------
@@ -126,16 +123,22 @@ public class ValueSerializer<T extends Value> extends 
TypeSerializer<T> {
        
        @Override
        public int hashCode() {
-               return this.type.hashCode() + 17;
+               return this.type.hashCode();
        }
        
        @Override
        public boolean equals(Object obj) {
-               if (obj.getClass() == ValueSerializer.class) {
+               if (obj instanceof ValueSerializer) {
                        ValueSerializer<?> other = (ValueSerializer<?>) obj;
-                       return this.type == other.type;
+
+                       return other.canEqual(this) && type == other.type;
                } else {
                        return false;
                }
        }
+
+       @Override
+       public boolean canEqual(Object obj) {
+               return obj instanceof ValueSerializer;
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8ca853e0/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializer.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializer.java
index 60012ee..d854f52 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializer.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializer.java
@@ -121,16 +121,22 @@ public class WritableSerializer<T extends Writable> 
extends TypeSerializer<T> {
        
        @Override
        public int hashCode() {
-               return this.typeClass.hashCode() + 177;
+               return this.typeClass.hashCode();
        }
        
        @Override
        public boolean equals(Object obj) {
-               if (obj.getClass() == WritableSerializer.class) {
+               if (obj instanceof WritableSerializer) {
                        WritableSerializer<?> other = (WritableSerializer<?>) 
obj;
-                       return this.typeClass == other.typeClass;
+
+                       return other.canEqual(this) && typeClass == 
other.typeClass;
                } else {
                        return false;
                }
        }
+
+       @Override
+       public boolean canEqual(Object obj) {
+               return obj instanceof WritableSerializer;
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8ca853e0/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java
index 1bc6771..f825fc6 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java
@@ -25,6 +25,7 @@ import 
com.esotericsoftware.kryo.factories.ReflectionSerializerFactory;
 import com.esotericsoftware.kryo.io.Input;
 import com.esotericsoftware.kryo.io.Output;
 import com.esotericsoftware.kryo.serializers.JavaSerializer;
+import com.google.common.base.Preconditions;
 import com.twitter.chill.ScalaKryoInstantiator;
 
 import org.apache.avro.generic.GenericData;
@@ -45,6 +46,7 @@ import java.lang.reflect.Modifier;
 import java.util.LinkedHashMap;
 import java.util.LinkedHashSet;
 import java.util.Map;
+import java.util.Objects;
 
 /**
  * A type serializer that serializes its type using the Kryo serialization
@@ -84,10 +86,7 @@ public class KryoSerializer<T> extends TypeSerializer<T> {
        // 
------------------------------------------------------------------------
 
        public KryoSerializer(Class<T> type, ExecutionConfig executionConfig){
-               if(type == null){
-                       throw new NullPointerException("Type class cannot be 
null.");
-               }
-               this.type = type;
+               this.type = Preconditions.checkNotNull(type);
 
                this.defaultSerializers = 
executionConfig.getDefaultKryoSerializers();
                this.defaultSerializerClasses = 
executionConfig.getDefaultKryoSerializerClasses();
@@ -241,19 +240,34 @@ public class KryoSerializer<T> extends TypeSerializer<T> {
        
        @Override
        public int hashCode() {
-               return type.hashCode();
+               return Objects.hash(
+                       type,
+                       registeredTypes,
+                       registeredTypesWithSerializerClasses,
+                       defaultSerializerClasses);
        }
        
        @Override
        public boolean equals(Object obj) {
-               if (obj != null && obj instanceof KryoSerializer) {
+               if (obj instanceof KryoSerializer) {
                        KryoSerializer<?> other = (KryoSerializer<?>) obj;
-                       return other.type == this.type;
+
+                       // we cannot include the Serializers here because they 
don't implement the equals method
+                       return other.canEqual(this) &&
+                               type == other.type &&
+                               registeredTypes.equals(other.registeredTypes) &&
+                               
registeredTypesWithSerializerClasses.equals(other.registeredTypesWithSerializerClasses)
 &&
+                               
defaultSerializerClasses.equals(other.defaultSerializerClasses);
                } else {
                        return false;
                }
        }
-       
+
+       @Override
+       public boolean canEqual(Object obj) {
+               return obj instanceof KryoSerializer;
+       }
+
        // 
--------------------------------------------------------------------------------------------
 
        private void checkKryoInitialized() {

http://git-wip-us.apache.org/repos/asf/flink/blob/8ca853e0/flink-java/src/test/java/org/apache/flink/api/java/io/CollectionInputFormatTest.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/test/java/org/apache/flink/api/java/io/CollectionInputFormatTest.java
 
b/flink-java/src/test/java/org/apache/flink/api/java/io/CollectionInputFormatTest.java
index ad491d0..ebaa44d 100644
--- 
a/flink-java/src/test/java/org/apache/flink/api/java/io/CollectionInputFormatTest.java
+++ 
b/flink-java/src/test/java/org/apache/flink/api/java/io/CollectionInputFormatTest.java
@@ -43,6 +43,7 @@ import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
+import java.util.Objects;
 
 public class CollectionInputFormatTest {
        
@@ -265,8 +266,8 @@ public class CollectionInputFormatTest {
 
                private static final long serialVersionUID = 1L;
                
-               private boolean failOnRead;
-               private boolean failOnWrite;
+               private final boolean failOnRead;
+               private final boolean failOnWrite;
                
                public TestSerializer(boolean failOnRead, boolean failOnWrite) {
                        this.failOnRead = failOnRead;
@@ -331,5 +332,26 @@ public class CollectionInputFormatTest {
                public void copy(DataInputView source, DataOutputView target) 
throws IOException {
                        target.writeInt(source.readInt());
                }
+
+               @Override
+               public boolean equals(Object obj) {
+                       if (obj instanceof TestSerializer) {
+                               TestSerializer other = (TestSerializer) obj;
+
+                               return other.canEqual(this) && failOnRead == 
other.failOnRead && failOnWrite == other.failOnWrite;
+                       } else {
+                               return false;
+                       }
+               }
+
+               @Override
+               public boolean canEqual(Object obj) {
+                       return obj instanceof TestSerializer;
+               }
+
+               @Override
+               public int hashCode() {
+                       return Objects.hash(failOnRead, failOnWrite);
+               }
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8ca853e0/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/PojoTypeExtractionTest.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/PojoTypeExtractionTest.java
 
b/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/PojoTypeExtractionTest.java
index 34fde20..96ba264 100644
--- 
a/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/PojoTypeExtractionTest.java
+++ 
b/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/PojoTypeExtractionTest.java
@@ -336,37 +336,37 @@ public class PojoTypeExtractionTest {
                                tupleSeen = false, objectSeen = false, 
writableSeen = false, collectionSeen = false;
                for(int i = 0; i < pojoTypeComplexNested.getArity(); i++) {
                        PojoField field = 
pojoTypeComplexNested.getPojoFieldAt(i);
-                       String name = field.field.getName();
+                       String name = field.getField().getName();
                        if(name.equals("date")) {
                                if(dateSeen) {
                                        Assert.fail("already seen");
                                }
                                dateSeen = true;
-                               
Assert.assertEquals(BasicTypeInfo.DATE_TYPE_INFO, field.type);
-                               Assert.assertEquals(Date.class, 
field.type.getTypeClass());
+                               
Assert.assertEquals(BasicTypeInfo.DATE_TYPE_INFO, field.getTypeInformation());
+                               Assert.assertEquals(Date.class, 
field.getTypeInformation().getTypeClass());
                        } else if(name.equals("someNumberWithÜnicödeNäme")) {
                                if(intSeen) {
                                        Assert.fail("already seen");
                                }
                                intSeen = true;
-                               
Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, field.type);
-                               Assert.assertEquals(Integer.class, 
field.type.getTypeClass());
+                               
Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, field.getTypeInformation());
+                               Assert.assertEquals(Integer.class, 
field.getTypeInformation().getTypeClass());
                        } else if(name.equals("someFloat")) {
                                if(floatSeen) {
                                        Assert.fail("already seen");
                                }
                                floatSeen = true;
-                               
Assert.assertEquals(BasicTypeInfo.FLOAT_TYPE_INFO, field.type);
-                               Assert.assertEquals(Float.class, 
field.type.getTypeClass());
+                               
Assert.assertEquals(BasicTypeInfo.FLOAT_TYPE_INFO, field.getTypeInformation());
+                               Assert.assertEquals(Float.class, 
field.getTypeInformation().getTypeClass());
                        } else if(name.equals("word")) {
                                if(tupleSeen) {
                                        Assert.fail("already seen");
                                }
                                tupleSeen = true;
-                               Assert.assertTrue(field.type instanceof 
TupleTypeInfo<?>);
-                               Assert.assertEquals(Tuple3.class, 
field.type.getTypeClass());
+                               Assert.assertTrue(field.getTypeInformation() 
instanceof TupleTypeInfo<?>);
+                               Assert.assertEquals(Tuple3.class, 
field.getTypeInformation().getTypeClass());
                                // do some more advanced checks on the tuple
-                               TupleTypeInfo<?> tupleTypeFromComplexNested = 
(TupleTypeInfo<?>) field.type;
+                               TupleTypeInfo<?> tupleTypeFromComplexNested = 
(TupleTypeInfo<?>) field.getTypeInformation();
                                
Assert.assertEquals(BasicTypeInfo.LONG_TYPE_INFO, 
tupleTypeFromComplexNested.getTypeAt(0));
                                
Assert.assertEquals(BasicTypeInfo.LONG_TYPE_INFO, 
tupleTypeFromComplexNested.getTypeAt(1));
                                
Assert.assertEquals(BasicTypeInfo.STRING_TYPE_INFO, 
tupleTypeFromComplexNested.getTypeAt(2));
@@ -375,21 +375,21 @@ public class PojoTypeExtractionTest {
                                        Assert.fail("already seen");
                                }
                                objectSeen = true;
-                               Assert.assertEquals(new 
GenericTypeInfo<Object>(Object.class), field.type);
-                               Assert.assertEquals(Object.class, 
field.type.getTypeClass());
+                               Assert.assertEquals(new 
GenericTypeInfo<Object>(Object.class), field.getTypeInformation());
+                               Assert.assertEquals(Object.class, 
field.getTypeInformation().getTypeClass());
                        } else if(name.equals("hadoopCitizen")) {
                                if(writableSeen) {
                                        Assert.fail("already seen");
                                }
                                writableSeen = true;
-                               Assert.assertEquals(new 
WritableTypeInfo<MyWritable>(MyWritable.class), field.type);
-                               Assert.assertEquals(MyWritable.class, 
field.type.getTypeClass());
+                               Assert.assertEquals(new 
WritableTypeInfo<MyWritable>(MyWritable.class), field.getTypeInformation());
+                               Assert.assertEquals(MyWritable.class, 
field.getTypeInformation().getTypeClass());
                        } else if(name.equals("collection")) {
                                if(collectionSeen) {
                                        Assert.fail("already seen");
                                }
                                collectionSeen = true;
-                               Assert.assertEquals(new 
GenericTypeInfo(List.class), field.type);
+                               Assert.assertEquals(new 
GenericTypeInfo(List.class), field.getTypeInformation());
 
                        } else {
                                Assert.fail("field "+field+" is not expected");
@@ -428,28 +428,28 @@ public class PojoTypeExtractionTest {
                PojoTypeInfo<?> pojoTypeForClass = (PojoTypeInfo<?>) 
typeInformation;
                for(int i = 0; i < pojoTypeForClass.getArity(); i++) {
                        PojoField field = pojoTypeForClass.getPojoFieldAt(i);
-                       String name = field.field.getName();
+                       String name = field.getField().getName();
                        if(name.equals("somethingFancy")) {
                                if(arrayListSeen) {
                                        Assert.fail("already seen");
                                }
                                arrayListSeen = true;
-                               Assert.assertTrue(field.type instanceof 
GenericTypeInfo);
-                               Assert.assertEquals(ArrayList.class, 
field.type.getTypeClass());
+                               Assert.assertTrue(field.getTypeInformation() 
instanceof GenericTypeInfo);
+                               Assert.assertEquals(ArrayList.class, 
field.getTypeInformation().getTypeClass());
                        } else if(name.equals("fancyIds")) {
                                if(multisetSeen) {
                                        Assert.fail("already seen");
                                }
                                multisetSeen = true;
-                               Assert.assertTrue(field.type instanceof 
GenericTypeInfo);
-                               Assert.assertEquals(HashMultiset.class, 
field.type.getTypeClass());
+                               Assert.assertTrue(field.getTypeInformation() 
instanceof GenericTypeInfo);
+                               Assert.assertEquals(HashMultiset.class, 
field.getTypeInformation().getTypeClass());
                        } else if(name.equals("fancyArray")) {
                                if(strArraySeen) {
                                        Assert.fail("already seen");
                                }
                                strArraySeen = true;
-                               
Assert.assertEquals(BasicArrayTypeInfo.STRING_ARRAY_TYPE_INFO, field.type);
-                               Assert.assertEquals(String[].class, 
field.type.getTypeClass());
+                               
Assert.assertEquals(BasicArrayTypeInfo.STRING_ARRAY_TYPE_INFO, 
field.getTypeInformation());
+                               Assert.assertEquals(String[].class, 
field.getTypeInformation().getTypeClass());
                        } else if(Arrays.asList("date", 
"someNumberWithÜnicödeNäme", "someFloat", "word", "nothing", 
"hadoopCitizen", "collection").contains(name)) {
                                // ignore these, they are inherited from the 
ComplexNestedClass
                        }
@@ -479,13 +479,13 @@ public class PojoTypeExtractionTest {
                PojoTypeInfo<?> pojoTypeForClass = (PojoTypeInfo<?>) 
typeInformation;
                for(int i = 0; i < pojoTypeForClass.getArity(); i++) {
                        PojoField field = pojoTypeForClass.getPojoFieldAt(i);
-                       String name = field.field.getName();
+                       String name = field.getField().getName();
                        if(name.equals("special")) {
-                               
Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, field.type);
+                               
Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, field.getTypeInformation());
                        } else if(name.equals("f0") || name.equals("f1")) {
-                               
Assert.assertEquals(BasicTypeInfo.STRING_TYPE_INFO, field.type);
+                               
Assert.assertEquals(BasicTypeInfo.STRING_TYPE_INFO, field.getTypeInformation());
                        } else if(name.equals("f2")) {
-                               
Assert.assertEquals(BasicTypeInfo.LONG_TYPE_INFO, field.type);
+                               
Assert.assertEquals(BasicTypeInfo.LONG_TYPE_INFO, field.getTypeInformation());
                        } else {
                                Assert.fail("unexpected field");
                        }
@@ -499,15 +499,15 @@ public class PojoTypeExtractionTest {
                PojoTypeInfo<?> pojoTypeForClass = (PojoTypeInfo<?>) 
typeForClass;
                for(int i = 0; i < pojoTypeForClass.getArity(); i++) {
                        PojoField field = pojoTypeForClass.getPojoFieldAt(i);
-                       String name = field.field.getName();
+                       String name = field.getField().getName();
                        if(name.equals("field1")) {
-                               
Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, field.type);
+                               
Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, field.getTypeInformation());
                        } else if (name.equals("field2")) {
-                               
Assert.assertEquals(BasicTypeInfo.LONG_TYPE_INFO, field.type);
+                               
Assert.assertEquals(BasicTypeInfo.LONG_TYPE_INFO, field.getTypeInformation());
                        } else if (name.equals("field3")) {
-                               
Assert.assertEquals(BasicTypeInfo.STRING_TYPE_INFO, field.type);
+                               
Assert.assertEquals(BasicTypeInfo.STRING_TYPE_INFO, field.getTypeInformation());
                        } else if (name.equals("key")) {
-                               
Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, field.type);
+                               
Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, field.getTypeInformation());
                        } else {
                                Assert.fail("Unexpected field "+field);
                        }
@@ -525,13 +525,13 @@ public class PojoTypeExtractionTest {
                PojoTypeInfo<?> pojoTypeForClass = (PojoTypeInfo<?>) 
typeForClass;
                for(int i = 0; i < pojoTypeForClass.getArity(); i++) {
                        PojoField field = pojoTypeForClass.getPojoFieldAt(i);
-                       String name = field.field.getName();
+                       String name = field.getField().getName();
                        if(name.equals("field1")) {
-                               Assert.assertEquals(new 
GenericTypeInfo<Object>(Object.class), field.type);
+                               Assert.assertEquals(new 
GenericTypeInfo<Object>(Object.class), field.getTypeInformation());
                        } else if (name.equals("field2")) {
-                               Assert.assertEquals(new 
GenericTypeInfo<Object>(Object.class), field.type);
+                               Assert.assertEquals(new 
GenericTypeInfo<Object>(Object.class), field.getTypeInformation());
                        } else if (name.equals("key")) {
-                               
Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, field.type);
+                               
Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, field.getTypeInformation());
                        } else {
                                Assert.fail("Unexpected field "+field);
                        }
@@ -546,14 +546,14 @@ public class PojoTypeExtractionTest {
                PojoTypeInfo<?> pojoTypeForClass = (PojoTypeInfo<?>) 
typeForClass;
                for(int i = 0; i < pojoTypeForClass.getArity(); i++) {
                        PojoField field = pojoTypeForClass.getPojoFieldAt(i);
-                       String name = field.field.getName();
+                       String name = field.getField().getName();
                        if(name.equals("field1")) {
-                               Assert.assertTrue(field.type instanceof 
PojoTypeInfo<?>); // From tuple is pojo (not tuple type!)
+                               Assert.assertTrue(field.getTypeInformation() 
instanceof PojoTypeInfo<?>); // From tuple is pojo (not tuple type!)
                        } else if (name.equals("field2")) {
-                               Assert.assertTrue(field.type instanceof 
TupleTypeInfo<?>);
-                               Assert.assertTrue( 
((TupleTypeInfo<?>)field.type).getTypeAt(0).equals(BasicTypeInfo.STRING_TYPE_INFO)
 );
+                               Assert.assertTrue(field.getTypeInformation() 
instanceof TupleTypeInfo<?>);
+                               Assert.assertTrue( 
((TupleTypeInfo<?>)field.getTypeInformation()).getTypeAt(0).equals(BasicTypeInfo.STRING_TYPE_INFO)
 );
                        } else if (name.equals("key")) {
-                               
Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, field.type);
+                               
Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, field.getTypeInformation());
                        } else {
                                Assert.fail("Unexpected field "+field);
                        }
@@ -641,13 +641,13 @@ public class PojoTypeExtractionTest {
                PojoTypeInfo<?> pti = (PojoTypeInfo<?>) ti;
                for(int i = 0; i < pti.getArity(); i++) {
                        PojoField field = pti.getPojoFieldAt(i);
-                       String name = field.field.getName();
+                       String name = field.getField().getName();
                        if(name.equals("field1")) {
-                               
Assert.assertEquals(BasicTypeInfo.STRING_TYPE_INFO, field.type);
+                               
Assert.assertEquals(BasicTypeInfo.STRING_TYPE_INFO, field.getTypeInformation());
                        } else if (name.equals("field2")) {
-                               
Assert.assertEquals(BasicTypeInfo.STRING_TYPE_INFO, field.type);
+                               
Assert.assertEquals(BasicTypeInfo.STRING_TYPE_INFO, field.getTypeInformation());
                        } else if (name.equals("key")) {
-                               
Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, field.type);
+                               
Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, field.getTypeInformation());
                        } else {
                                Assert.fail("Unexpected field "+field);
                        }
@@ -680,15 +680,15 @@ public class PojoTypeExtractionTest {
                PojoTypeInfo<?> pti = (PojoTypeInfo<?>) ti;
                for(int i = 0; i < pti.getArity(); i++) {
                        PojoField field = pti.getPojoFieldAt(i);
-                       String name = field.field.getName();
+                       String name = field.getField().getName();
                        if(name.equals("extraField")) {
-                               
Assert.assertEquals(BasicTypeInfo.CHAR_TYPE_INFO, field.type);
+                               
Assert.assertEquals(BasicTypeInfo.CHAR_TYPE_INFO, field.getTypeInformation());
                        } else if (name.equals("f0")) {
-                               
Assert.assertEquals(BasicTypeInfo.BOOLEAN_TYPE_INFO, field.type);
+                               
Assert.assertEquals(BasicTypeInfo.BOOLEAN_TYPE_INFO, 
field.getTypeInformation());
                        } else if (name.equals("f1")) {
-                               
Assert.assertEquals(BasicTypeInfo.BOOLEAN_TYPE_INFO, field.type);
+                               
Assert.assertEquals(BasicTypeInfo.BOOLEAN_TYPE_INFO, 
field.getTypeInformation());
                        } else if (name.equals("f2")) {
-                               
Assert.assertEquals(BasicTypeInfo.LONG_TYPE_INFO, field.type);
+                               
Assert.assertEquals(BasicTypeInfo.LONG_TYPE_INFO, field.getTypeInformation());
                        } else {
                                Assert.fail("Unexpected field "+field);
                        }
@@ -831,7 +831,7 @@ public class PojoTypeExtractionTest {
        public void testRecursivePojo1() {
                TypeInformation<?> ti = 
TypeExtractor.createTypeInfo(RecursivePojo1.class);
                Assert.assertTrue(ti instanceof PojoTypeInfo);
-               Assert.assertEquals(GenericTypeInfo.class, ((PojoTypeInfo) 
ti).getPojoFieldAt(0).type.getClass());
+               Assert.assertEquals(GenericTypeInfo.class, ((PojoTypeInfo) 
ti).getPojoFieldAt(0).getTypeInformation().getClass());
        }
 
        @Test
@@ -839,8 +839,8 @@ public class PojoTypeExtractionTest {
                TypeInformation<?> ti = 
TypeExtractor.createTypeInfo(RecursivePojo2.class);
                Assert.assertTrue(ti instanceof PojoTypeInfo);
                PojoField pf = ((PojoTypeInfo) ti).getPojoFieldAt(0);
-               Assert.assertTrue(pf.type instanceof TupleTypeInfo);
-               Assert.assertEquals(GenericTypeInfo.class, ((TupleTypeInfo) 
pf.type).getTypeAt(0).getClass());
+               Assert.assertTrue(pf.getTypeInformation() instanceof 
TupleTypeInfo);
+               Assert.assertEquals(GenericTypeInfo.class, ((TupleTypeInfo) 
pf.getTypeInformation()).getTypeAt(0).getClass());
        }
 
        @Test
@@ -848,8 +848,8 @@ public class PojoTypeExtractionTest {
                TypeInformation<?> ti = 
TypeExtractor.createTypeInfo(RecursivePojo3.class);
                Assert.assertTrue(ti instanceof PojoTypeInfo);
                PojoField pf = ((PojoTypeInfo) ti).getPojoFieldAt(0);
-               Assert.assertTrue(pf.type instanceof PojoTypeInfo);
-               Assert.assertEquals(GenericTypeInfo.class, ((PojoTypeInfo) 
pf.type).getPojoFieldAt(0).type.getClass());
+               Assert.assertTrue(pf.getTypeInformation() instanceof 
PojoTypeInfo);
+               Assert.assertEquals(GenericTypeInfo.class, ((PojoTypeInfo) 
pf.getTypeInformation()).getPojoFieldAt(0).getTypeInformation().getClass());
        }
 
        public static class FooBarPojo {

http://git-wip-us.apache.org/repos/asf/flink/blob/8ca853e0/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/TypeExtractorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/TypeExtractorTest.java
 
b/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/TypeExtractorTest.java
index d27a82b..7f0cf5b 100644
--- 
a/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/TypeExtractorTest.java
+++ 
b/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/TypeExtractorTest.java
@@ -66,6 +66,8 @@ import org.apache.hadoop.io.Writable;
 import org.junit.Assert;
 import org.junit.Test;
 
+import javax.xml.bind.TypeConstraintException;
+
 
 public class TypeExtractorTest {
 
@@ -1237,7 +1239,7 @@ public class TypeExtractorTest {
                TypeInformation<?> ti = 
TypeExtractor.getMapReturnTypes(function, (TypeInformation) 
TypeInfoParser.parse("org.apache.flink.api.java.type.extractor.TypeExtractorTest$CustomArrayObject[]"));
 
                Assert.assertTrue(ti instanceof ObjectArrayTypeInfo<?, ?>);
-               Assert.assertEquals(CustomArrayObject.class, 
((ObjectArrayTypeInfo<?, ?>) ti).getComponentType());
+               Assert.assertEquals(CustomArrayObject.class, 
((ObjectArrayTypeInfo<?, ?>) ti).getComponentInfo().getTypeClass());
        }
 
        @SuppressWarnings({ "rawtypes", "unchecked" })

Reply via email to