http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java
deleted file mode 100644
index 276ffc4..0000000
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java
+++ /dev/null
@@ -1,382 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.typeutils.runtime.kryo;
-
-import com.esotericsoftware.kryo.Kryo;
-import com.esotericsoftware.kryo.KryoException;
-import com.esotericsoftware.kryo.Serializer;
-import com.esotericsoftware.kryo.factories.ReflectionSerializerFactory;
-import com.esotericsoftware.kryo.io.Input;
-import com.esotericsoftware.kryo.io.Output;
-import com.esotericsoftware.kryo.serializers.JavaSerializer;
-import com.google.common.base.Preconditions;
-
-import org.apache.avro.generic.GenericData;
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.typeutils.runtime.DataInputViewStream;
-import org.apache.flink.api.java.typeutils.runtime.DataOutputViewStream;
-import org.apache.flink.api.java.typeutils.runtime.NoFetchingInput;
-import 
org.apache.flink.api.java.typeutils.runtime.kryo.Serializers.SpecificInstanceCollectionSerializerForArrayList;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-import org.objenesis.strategy.StdInstantiatorStrategy;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.EOFException;
-import java.io.IOException;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-import java.lang.reflect.Modifier;
-import java.util.LinkedHashMap;
-import java.util.LinkedHashSet;
-import java.util.Map;
-import java.util.Objects;
-
-/**
- * A type serializer that serializes its type using the Kryo serialization
- * framework (https://github.com/EsotericSoftware/kryo).
- * 
- * This serializer is intended as a fallback serializer for the cases that are
- * not covered by the basic types, tuples, and POJOs.
- *
- * @param <T> The type to be serialized.
- */
-public class KryoSerializer<T> extends TypeSerializer<T> {
-
-       private static final long serialVersionUID = 3L;
-
-       private static final Logger LOG = 
LoggerFactory.getLogger(KryoSerializer.class);
-
-       // 
------------------------------------------------------------------------
-
-       private final LinkedHashMap<Class<?>, 
ExecutionConfig.SerializableSerializer<?>> registeredTypesWithSerializers;
-       private final LinkedHashMap<Class<?>, Class<? extends Serializer<?>>> 
registeredTypesWithSerializerClasses;
-       private final LinkedHashMap<Class<?>, 
ExecutionConfig.SerializableSerializer<?>> defaultSerializers;
-       private final LinkedHashMap<Class<?>, Class<? extends Serializer<?>>> 
defaultSerializerClasses;
-       private final LinkedHashSet<Class<?>> registeredTypes;
-
-       private final Class<T> type;
-       
-       // 
------------------------------------------------------------------------
-       // The fields below are lazily initialized after duplication or 
deserialization.
-
-       private transient Kryo kryo;
-       private transient T copyInstance;
-       
-       private transient DataOutputView previousOut;
-       private transient DataInputView previousIn;
-       
-       private transient Input input;
-       private transient Output output;
-
-       // 
------------------------------------------------------------------------
-
-       public KryoSerializer(Class<T> type, ExecutionConfig executionConfig){
-               this.type = Preconditions.checkNotNull(type);
-
-               this.defaultSerializers = 
executionConfig.getDefaultKryoSerializers();
-               this.defaultSerializerClasses = 
executionConfig.getDefaultKryoSerializerClasses();
-               this.registeredTypesWithSerializers = 
executionConfig.getRegisteredTypesWithKryoSerializers();
-               this.registeredTypesWithSerializerClasses = 
executionConfig.getRegisteredTypesWithKryoSerializerClasses();
-               this.registeredTypes = executionConfig.getRegisteredKryoTypes();
-       }
-
-       /**
-        * Copy-constructor that does not copy transient fields. They will be 
initialized once required.
-        */
-       protected KryoSerializer(KryoSerializer<T> toCopy) {
-               registeredTypesWithSerializers = 
toCopy.registeredTypesWithSerializers;
-               registeredTypesWithSerializerClasses = 
toCopy.registeredTypesWithSerializerClasses;
-               defaultSerializers = toCopy.defaultSerializers;
-               defaultSerializerClasses = toCopy.defaultSerializerClasses;
-               registeredTypes = toCopy.registeredTypes;
-
-               type = toCopy.type;
-               if(type == null){
-                       throw new NullPointerException("Type class cannot be 
null.");
-               }
-       }
-
-       // 
------------------------------------------------------------------------
-
-       @Override
-       public boolean isImmutableType() {
-               return false;
-       }
-
-       @Override
-       public KryoSerializer<T> duplicate() {
-               return new KryoSerializer<T>(this);
-       }
-
-       @Override
-       public T createInstance() {
-               if(Modifier.isAbstract(type.getModifiers()) || 
Modifier.isInterface(type.getModifiers()) ) {
-                       return null;
-               } else {
-                       checkKryoInitialized();
-                       try {
-                               return kryo.newInstance(type);
-                       } catch(Throwable e) {
-                               return null;
-                       }
-               }
-       }
-
-       @SuppressWarnings("unchecked")
-       @Override
-       public T copy(T from) {
-               if (from == null) {
-                       return null;
-               }
-               checkKryoInitialized();
-               try {
-                       return kryo.copy(from);
-               }
-               catch(KryoException ke) {
-                       // kryo was unable to copy it, so we do it through 
serialization:
-                       ByteArrayOutputStream baout = new 
ByteArrayOutputStream();
-                       Output output = new Output(baout);
-
-                       kryo.writeObject(output, from);
-
-                       output.close();
-
-                       ByteArrayInputStream bain = new 
ByteArrayInputStream(baout.toByteArray());
-                       Input input = new Input(bain);
-
-                       return (T)kryo.readObject(input, from.getClass());
-               }
-       }
-       
-       @Override
-       public T copy(T from, T reuse) {
-               return copy(from);
-       }
-
-       @Override
-       public int getLength() {
-               return -1;
-       }
-
-       @Override
-       public void serialize(T record, DataOutputView target) throws 
IOException {
-               checkKryoInitialized();
-               if (target != previousOut) {
-                       DataOutputViewStream outputStream = new 
DataOutputViewStream(target);
-                       output = new Output(outputStream);
-                       previousOut = target;
-               }
-
-               // Sanity check: Make sure that the output is cleared/has been 
flushed by the last call
-               // otherwise data might be written multiple times in case of a 
previous EOFException
-               if (output.position() != 0) {
-                       throw new IllegalStateException("The Kryo Output still 
contains data from a previous " +
-                               "serialize call. It has to be flushed or 
cleared at the end of the serialize call.");
-               }
-
-               try {
-                       kryo.writeClassAndObject(output, record);
-                       output.flush();
-               }
-               catch (KryoException ke) {
-                       // make sure that the Kryo output buffer is cleared in 
case that we can recover from
-                       // the exception (e.g. EOFException which denotes 
buffer full)
-                       output.clear();
-
-                       Throwable cause = ke.getCause();
-                       if (cause instanceof EOFException) {
-                               throw (EOFException) cause;
-                       }
-                       else {
-                               throw ke;
-                       }
-               }
-       }
-
-       @SuppressWarnings("unchecked")
-       @Override
-       public T deserialize(DataInputView source) throws IOException {
-               checkKryoInitialized();
-               if (source != previousIn) {
-                       DataInputViewStream inputStream = new 
DataInputViewStream(source);
-                       input = new NoFetchingInput(inputStream);
-                       previousIn = source;
-               }
-
-               try {
-                       return (T) kryo.readClassAndObject(input);
-               } catch (KryoException ke) {
-                       Throwable cause = ke.getCause();
-
-                       if (cause instanceof EOFException) {
-                               throw (EOFException) cause;
-                       } else {
-                               throw ke;
-                       }
-               }
-       }
-       
-       @Override
-       public T deserialize(T reuse, DataInputView source) throws IOException {
-               return deserialize(source);
-       }
-
-       @Override
-       public void copy(DataInputView source, DataOutputView target) throws 
IOException {
-               checkKryoInitialized();
-               if(this.copyInstance == null){
-                       this.copyInstance = createInstance();
-               }
-
-               T tmp = deserialize(copyInstance, source);
-               serialize(tmp, target);
-       }
-       
-       // 
--------------------------------------------------------------------------------------------
-       
-       @Override
-       public int hashCode() {
-               return Objects.hash(
-                       type,
-                       registeredTypes,
-                       registeredTypesWithSerializerClasses,
-                       defaultSerializerClasses);
-       }
-       
-       @Override
-       public boolean equals(Object obj) {
-               if (obj instanceof KryoSerializer) {
-                       KryoSerializer<?> other = (KryoSerializer<?>) obj;
-
-                       // we cannot include the Serializers here because they 
don't implement the equals method
-                       return other.canEqual(this) &&
-                               type == other.type &&
-                               registeredTypes.equals(other.registeredTypes) &&
-                               
registeredTypesWithSerializerClasses.equals(other.registeredTypesWithSerializerClasses)
 &&
-                               
defaultSerializerClasses.equals(other.defaultSerializerClasses);
-               } else {
-                       return false;
-               }
-       }
-
-       @Override
-       public boolean canEqual(Object obj) {
-               return obj instanceof KryoSerializer;
-       }
-
-       // 
--------------------------------------------------------------------------------------------
-
-       /**
-        * Returns the Chill Kryo Serializer which is implictly added to the 
classpath via flink-runtime.
-        * Falls back to the default Kryo serializer if it can't be found.
-        * @return The Kryo serializer instance.
-        */
-       private Kryo getKryoInstance() {
-
-               try {
-                       // check if ScalaKryoInstantiator is in class path 
(coming from Twitter's Chill library).
-                       // This will be true if Flink's Scala API is used.
-                       Class<?> chillInstantiatorClazz = 
Class.forName("com.twitter.chill.ScalaKryoInstantiator");
-                       Object chillInstantiator = 
chillInstantiatorClazz.newInstance();
-
-                       // obtain a Kryo instance through Twitter Chill
-                       Method m = chillInstantiatorClazz.getMethod("newKryo");
-
-                       return (Kryo) m.invoke(chillInstantiator);
-               } catch (ClassNotFoundException | InstantiationException | 
NoSuchMethodException |
-                       IllegalAccessException | InvocationTargetException e) {
-
-                       LOG.warn("Falling back to default Kryo serializer 
because Chill serializer couldn't be found.", e);
-
-                       Kryo.DefaultInstantiatorStrategy initStrategy = new 
Kryo.DefaultInstantiatorStrategy();
-                       initStrategy.setFallbackInstantiatorStrategy(new 
StdInstantiatorStrategy());
-
-                       Kryo kryo = new Kryo();
-                       kryo.setInstantiatorStrategy(initStrategy);
-
-                       return kryo;
-               }
-       }
-
-       private void checkKryoInitialized() {
-               if (this.kryo == null) {
-                       this.kryo = getKryoInstance();
-
-                       // disable reference tracking. reference tracking is 
costly, usually unnecessary, and
-                       // inconsistent with Flink's own serialization (which 
does not do reference tracking)
-                       kryo.setReferences(false);
-                       
-                       // Throwable and all subclasses should be serialized 
via java serialization
-                       kryo.addDefaultSerializer(Throwable.class, new 
JavaSerializer());
-
-                       // Add default serializers first, so that they type 
registrations without a serializer
-                       // are registered with a default serializer
-                       for (Map.Entry<Class<?>, 
ExecutionConfig.SerializableSerializer<?>> entry: 
defaultSerializers.entrySet()) {
-                               kryo.addDefaultSerializer(entry.getKey(), 
entry.getValue().getSerializer());
-                       }
-
-                       for (Map.Entry<Class<?>, Class<? extends 
Serializer<?>>> entry: defaultSerializerClasses.entrySet()) {
-                               kryo.addDefaultSerializer(entry.getKey(), 
entry.getValue());
-                       }
-
-                       // register the type of our class
-                       kryo.register(type);
-
-                       // register given types. we do this first so that any 
registration of a
-                       // more specific serializer overrides this
-                       for (Class<?> type : registeredTypes) {
-                               kryo.register(type);
-                       }
-
-                       // register given serializer classes
-                       for (Map.Entry<Class<?>, Class<? extends 
Serializer<?>>> e : registeredTypesWithSerializerClasses.entrySet()) {
-                               Class<?> typeClass = e.getKey();
-                               Class<? extends Serializer<?>> serializerClass 
= e.getValue();
-
-                               Serializer<?> serializer =
-                                               
ReflectionSerializerFactory.makeSerializer(kryo, serializerClass, typeClass);
-                               kryo.register(typeClass, serializer);
-                       }
-
-                       // register given serializers
-                       for (Map.Entry<Class<?>, 
ExecutionConfig.SerializableSerializer<?>> e : 
registeredTypesWithSerializers.entrySet()) {
-                               kryo.register(e.getKey(), 
e.getValue().getSerializer());
-                       }
-                       // this is needed for Avro but can not be added on 
demand.
-                       kryo.register(GenericData.Array.class, new 
SpecificInstanceCollectionSerializerForArrayList());
-
-                       kryo.setRegistrationRequired(false);
-                       
kryo.setClassLoader(Thread.currentThread().getContextClassLoader());
-               }
-       }
-
-       // 
--------------------------------------------------------------------------------------------
-       // For testing
-       // 
--------------------------------------------------------------------------------------------
-       
-       public Kryo getKryo() {
-               checkKryoInitialized();
-               return this.kryo;
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/Serializers.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/Serializers.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/Serializers.java
deleted file mode 100644
index 8bac729..0000000
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/Serializers.java
+++ /dev/null
@@ -1,229 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.typeutils.runtime.kryo;
-
-import com.esotericsoftware.kryo.Kryo;
-import com.esotericsoftware.kryo.Serializer;
-import com.esotericsoftware.kryo.io.Input;
-import com.esotericsoftware.kryo.io.Output;
-import com.esotericsoftware.kryo.serializers.CollectionSerializer;
-import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericData;
-import org.apache.avro.specific.SpecificRecordBase;
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeutils.CompositeType;
-import org.apache.flink.api.java.Utils;
-import org.apache.flink.api.java.typeutils.GenericTypeInfo;
-import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
-
-import java.io.Serializable;
-import java.lang.reflect.Field;
-import java.lang.reflect.GenericArrayType;
-import java.lang.reflect.Modifier;
-import java.lang.reflect.ParameterizedType;
-import java.lang.reflect.Type;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.Set;
-
-
-/**
- * Class containing utilities for the serializers of the Flink Runtime.
- *
- * Most of the serializers are automatically added to the system.
- *
- * Note that users can also implement the {@link 
com.esotericsoftware.kryo.KryoSerializable} interface
- * to provide custom serialization for their classes.
- * Also, there is a Java Annotation for adding a default serializer 
(@DefaultSerializer) to classes.
- */
-public class Serializers {
-
-       public static void recursivelyRegisterType(TypeInformation<?> typeInfo, 
ExecutionConfig config, Set<Class<?>> alreadySeen) {
-               if (typeInfo instanceof GenericTypeInfo) {
-                       GenericTypeInfo<?> genericTypeInfo = 
(GenericTypeInfo<?>) typeInfo;
-                       
Serializers.recursivelyRegisterType(genericTypeInfo.getTypeClass(), config, 
alreadySeen);
-               }
-               else if (typeInfo instanceof CompositeType) {
-                       List<GenericTypeInfo<?>> genericTypesInComposite = new 
ArrayList<>();
-                       
Utils.getContainedGenericTypes((CompositeType<?>)typeInfo, 
genericTypesInComposite);
-                       for (GenericTypeInfo<?> gt : genericTypesInComposite) {
-                               
Serializers.recursivelyRegisterType(gt.getTypeClass(), config, alreadySeen);
-                       }
-               }
-               else if (typeInfo instanceof ObjectArrayTypeInfo) {
-                       ObjectArrayTypeInfo<?, ?> objectArrayTypeInfo = 
(ObjectArrayTypeInfo<?, ?>) typeInfo;
-                       
recursivelyRegisterType(objectArrayTypeInfo.getComponentInfo(), config, 
alreadySeen);
-               }
-       }
-       
-       public static void recursivelyRegisterType(Class<?> type, 
ExecutionConfig config, Set<Class<?>> alreadySeen) {
-               // don't register or remember primitives
-               if (type == null || type.isPrimitive() || type == Object.class) 
{
-                       return;
-               }
-               
-               // prevent infinite recursion for recursive types
-               if (!alreadySeen.add(type)) {
-                       return;
-               }
-               
-               if (type.isArray()) {
-                       recursivelyRegisterType(type.getComponentType(), 
config, alreadySeen);
-               }
-               else {
-                       config.registerKryoType(type);
-                       checkAndAddSerializerForTypeAvro(config, type);
-       
-                       Field[] fields = type.getDeclaredFields();
-                       for (Field field : fields) {
-                               if (Modifier.isStatic(field.getModifiers()) || 
Modifier.isTransient(field.getModifiers())) {
-                                       continue;
-                               }
-                               Type fieldType = field.getGenericType();
-                               recursivelyRegisterGenericType(fieldType, 
config, alreadySeen);
-                       }
-               }
-       }
-       
-       private static void recursivelyRegisterGenericType(Type fieldType, 
ExecutionConfig config, Set<Class<?>> alreadySeen) {
-               if (fieldType instanceof ParameterizedType) {
-                       // field has generics
-                       ParameterizedType parameterizedFieldType = 
(ParameterizedType) fieldType;
-                       
-                       for (Type t: 
parameterizedFieldType.getActualTypeArguments()) {
-                               if (TypeExtractor.isClassType(t) ) {
-                                       
recursivelyRegisterType(TypeExtractor.typeToClass(t), config, alreadySeen);
-                               }
-                       }
-
-                       
recursivelyRegisterGenericType(parameterizedFieldType.getRawType(), config, 
alreadySeen);
-               }
-               else if (fieldType instanceof GenericArrayType) {
-                       GenericArrayType genericArrayType = (GenericArrayType) 
fieldType;
-                       
recursivelyRegisterGenericType(genericArrayType.getGenericComponentType(), 
config, alreadySeen);
-               }
-               else if (fieldType instanceof Class) {
-                       Class<?> clazz = (Class<?>) fieldType;
-                       recursivelyRegisterType(clazz, config, alreadySeen);
-               }
-       }
-
-       private static void checkAndAddSerializerForTypeAvro(ExecutionConfig 
reg, Class<?> type) {
-               if (GenericData.Record.class.isAssignableFrom(type)) {
-                       registerGenericAvro(reg);
-               }
-               if (SpecificRecordBase.class.isAssignableFrom(type)) {
-                       @SuppressWarnings("unchecked")
-                       Class<? extends SpecificRecordBase> specRecordClass = 
(Class<? extends SpecificRecordBase>) type;
-                       registerSpecificAvro(reg, specRecordClass);
-               }
-       }
-
-       /**
-        * Register these serializers for using Avro's {@link 
GenericData.Record} and classes
-        * implementing {@link org.apache.avro.specific.SpecificRecordBase}
-        */
-       private static void registerGenericAvro(ExecutionConfig reg) {
-               // Avro POJOs contain java.util.List which have 
GenericData.Array as their runtime type
-               // because Kryo is not able to serialize them properly, we use 
this serializer for them
-               reg.registerTypeWithKryoSerializer(GenericData.Array.class, 
SpecificInstanceCollectionSerializerForArrayList.class);
-               
-               // We register this serializer for users who want to use 
untyped Avro records (GenericData.Record).
-               // Kryo is able to serialize everything in there, except for 
the Schema.
-               // This serializer is very slow, but using the 
GenericData.Records of Kryo is in general a bad idea.
-               // we add the serializer as a default serializer because Avro 
is using a private sub-type at runtime.
-               reg.addDefaultKryoSerializer(Schema.class, 
AvroSchemaSerializer.class);
-       }
-
-       private static void registerSpecificAvro(ExecutionConfig reg, Class<? 
extends SpecificRecordBase> avroType) {
-               registerGenericAvro(reg);
-               // This rule only applies if users explicitly use the 
GenericTypeInformation for the avro types
-               // usually, we are able to handle Avro POJOs with the POJO 
serializer.
-               // (However only if the GenericData.Array type is registered!)
-
-       //      ClassTag<SpecificRecordBase> tag = 
scala.reflect.ClassTag$.MODULE$.apply(avroType);
-       //      reg.registerTypeWithKryoSerializer(avroType, 
com.twitter.chill.avro.AvroSerializer.SpecificRecordSerializer(tag));
-       }
-
-       // 
--------------------------------------------------------------------------------------------
-       // Custom Serializers
-       // 
--------------------------------------------------------------------------------------------
-
-       @SuppressWarnings("rawtypes")
-       public static class SpecificInstanceCollectionSerializerForArrayList 
extends SpecificInstanceCollectionSerializer<ArrayList> {
-               private static final long serialVersionUID = 1L;
-
-               public SpecificInstanceCollectionSerializerForArrayList() {
-                       super(ArrayList.class);
-               }
-       }
-       /**
-        * Special serializer for Java collections enforcing certain instance 
types.
-        * Avro is serializing collections with an "GenericData.Array" type. 
Kryo is not able to handle
-        * this type, so we use ArrayLists.
-        */
-       @SuppressWarnings("rawtypes")
-       public static class SpecificInstanceCollectionSerializer<T extends 
Collection> 
-                       extends CollectionSerializer implements Serializable
-       {
-               private static final long serialVersionUID = 1L;
-               
-               private Class<T> type;
-               
-               public SpecificInstanceCollectionSerializer(Class<T> type) {
-                       this.type = type;
-               }
-
-               @Override
-               protected Collection create(Kryo kryo, Input input, 
Class<Collection> type) {
-                       return kryo.newInstance(this.type);
-               }
-
-               @Override
-               protected Collection createCopy(Kryo kryo, Collection original) 
{
-                       return kryo.newInstance(this.type);
-               }
-       }
-
-       /**
-        * Slow serialization approach for Avro schemas.
-        * This is only used with {{@link 
org.apache.avro.generic.GenericData.Record}} types.
-        * Having this serializer, we are able to handle avro Records.
-        */
-       public static class AvroSchemaSerializer extends Serializer<Schema> 
implements Serializable {
-               private static final long serialVersionUID = 1L;
-
-               @Override
-               public void write(Kryo kryo, Output output, Schema object) {
-                       String schemaAsString = object.toString(false);
-                       output.writeString(schemaAsString);
-               }
-
-               @Override
-               public Schema read(Kryo kryo, Input input, Class<Schema> type) {
-                       String schemaAsString = input.readString();
-                       // the parser seems to be stateful, to we need a new 
one for every type.
-                       Schema.Parser sParser = new Schema.Parser();
-                       return sParser.parse(schemaAsString);
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/test/java/org/apache/flink/api/java/TypeExtractionTest.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/test/java/org/apache/flink/api/java/TypeExtractionTest.java 
b/flink-java/src/test/java/org/apache/flink/api/java/TypeExtractionTest.java
new file mode 100644
index 0000000..fb20d78
--- /dev/null
+++ b/flink-java/src/test/java/org/apache/flink/api/java/TypeExtractionTest.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java;
+
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.TypeInfoParser;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests concerning type extraction by ExecutionEnvironment methods.
+ */
+@SuppressWarnings("serial")
+public class TypeExtractionTest {
+       
+       @SuppressWarnings({"unchecked", "rawtypes"})
+       @Test
+       public void testFunctionWithMissingGenericsAndReturns() {
+               
+               RichMapFunction function = new RichMapFunction() {
+                       private static final long serialVersionUID = 1L;
+
+                       @Override
+                       public Object map(Object value) throws Exception {
+                               return null;
+                       }
+               };
+
+               TypeInformation<?> info = 
ExecutionEnvironment.getExecutionEnvironment()
+                               .fromElements("arbitrary", "data")
+                               
.map(function).returns("String").getResultType();
+
+               assertEquals(TypeInfoParser.parse("String"), info);
+       }
+
+       @Test
+       public void testGetterSetterWithVertex() {
+               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               env.fromElements(new VertexTyped(0L, 3.0), new VertexTyped(1L, 
1.0));
+       }
+       
+       // 
------------------------------------------------------------------------
+       //  Test types
+       // 
------------------------------------------------------------------------
+
+       public static class Vertex<K, V> {
+
+               private K key1;
+               private K key2;
+               private V value;
+
+               public Vertex() {}
+
+               public Vertex(K key, V value) {
+                       this.key1 = key;
+                       this.key2 = key;
+                       this.value = value;
+               }
+
+               public Vertex(K key1, K key2, V value) {
+                       this.key1 = key1;
+                       this.key2 = key2;
+                       this.value = value;
+               }
+
+               public void setKey1(K key1) {
+                       this.key1 = key1;
+               }
+
+               public void setKey2(K key2) {
+                       this.key2 = key2;
+               }
+
+               public K getKey1() {
+                       return key1;
+               }
+
+               public K getKey2() {
+                       return key2;
+               }
+
+               public void setValue(V value) {
+                       this.value = value;
+               }
+
+               public V getValue() {
+                       return value;
+               }
+       }
+
+       public static class VertexTyped extends Vertex<Long, Double>{
+               public VertexTyped(Long l, Double d) {
+                       super(l, d);
+               }
+               public VertexTyped() {
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/test/java/org/apache/flink/api/java/operators/ExpressionKeysTest.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/test/java/org/apache/flink/api/java/operators/ExpressionKeysTest.java
 
b/flink-java/src/test/java/org/apache/flink/api/java/operators/ExpressionKeysTest.java
deleted file mode 100644
index dd82234..0000000
--- 
a/flink-java/src/test/java/org/apache/flink/api/java/operators/ExpressionKeysTest.java
+++ /dev/null
@@ -1,479 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.java.operators;
-
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-import java.util.Arrays;
-
-import org.apache.commons.lang3.ArrayUtils;
-import org.apache.flink.api.common.InvalidProgramException;
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.operators.Keys.ExpressionKeys;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.api.java.tuple.Tuple7;
-import 
org.apache.flink.api.java.type.extractor.PojoTypeExtractionTest.ComplexNestedClass;
-import org.apache.flink.api.java.typeutils.GenericTypeInfo;
-import org.apache.flink.api.java.typeutils.TupleTypeInfo;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
-import 
org.apache.flink.api.java.operators.SelectorFunctionKeysTest.KeySelector1;
-import 
org.apache.flink.api.java.operators.SelectorFunctionKeysTest.KeySelector3;
-import org.junit.Assert;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.powermock.modules.junit4.PowerMockRunner;
-import org.powermock.reflect.Whitebox;
-
-@SuppressWarnings("unused")
-@RunWith(PowerMockRunner.class)
-public class ExpressionKeysTest {
-
-       @Test
-       public void testBasicType() {
-
-               TypeInformation<Long> longType = BasicTypeInfo.LONG_TYPE_INFO;
-               ExpressionKeys<Long> ek = new ExpressionKeys<>("*", longType);
-
-               Assert.assertArrayEquals(new int[] {0}, 
ek.computeLogicalKeyPositions());
-       }
-
-       @Test(expected = InvalidProgramException.class)
-       public void testGenericNonKeyType() {
-               // Fail: GenericType cannot be used as key
-               TypeInformation<GenericNonKeyType> genericType = new 
GenericTypeInfo<>(GenericNonKeyType.class);
-               new ExpressionKeys<>("*", genericType);
-       }
-
-       @Test
-       public void testKeyGenericType() {
-
-               TypeInformation<GenericKeyType> genericType = new 
GenericTypeInfo<>(GenericKeyType.class);
-               ExpressionKeys<GenericKeyType> ek = new ExpressionKeys<>("*", 
genericType);
-
-               Assert.assertArrayEquals(new int[] {0}, 
ek.computeLogicalKeyPositions());
-       }
-
-       @Test
-       public void testTupleRangeCheck() throws IllegalAccessException, 
IllegalArgumentException, InvocationTargetException {
-               
-               // test private static final int[] rangeCheckFields(int[] 
fields, int maxAllowedField)
-               Method rangeCheckFieldsMethod = Whitebox.getMethod(Keys.class, 
"rangeCheckFields", int[].class, int.class);
-
-               // valid indexes
-               rangeCheckFieldsMethod.invoke(null, new int[]{1, 2, 3, 4}, 4);
-
-               // corner case tests
-               rangeCheckFieldsMethod.invoke(null, new int[] {0}, 0);
-
-               Throwable ex = null;
-               try {
-                       // throws illegal argument.
-                       rangeCheckFieldsMethod.invoke(null, new int[] {5}, 0);
-               } catch(Throwable iae) {
-                       ex = iae;
-               }
-               Assert.assertNotNull(ex);
-       }
-       
-       @Test
-       public void testStandardTupleKeys() {
-               TupleTypeInfo<Tuple7<String, String, String, String, String, 
String, String>> typeInfo = new TupleTypeInfo<>(
-                               
BasicTypeInfo.STRING_TYPE_INFO,BasicTypeInfo.STRING_TYPE_INFO,BasicTypeInfo.STRING_TYPE_INFO,BasicTypeInfo.STRING_TYPE_INFO,BasicTypeInfo.STRING_TYPE_INFO,
-                               
BasicTypeInfo.STRING_TYPE_INFO,BasicTypeInfo.STRING_TYPE_INFO);
-               
-               ExpressionKeys<Tuple7<String, String, String, String, String, 
String, String>> ek;
-               
-               for( int i = 1; i < 8; i++) {
-                       int[] ints = new int[i];
-                       for( int j = 0; j < i; j++) {
-                               ints[j] = j;
-                       }
-                       int[] inInts = Arrays.copyOf(ints, ints.length); // 
copy, just to make sure that the code is not cheating by changing the ints.
-                       ek = new ExpressionKeys<>(inInts, typeInfo);
-                       Assert.assertArrayEquals(ints, 
ek.computeLogicalKeyPositions());
-                       Assert.assertEquals(ints.length, 
ek.computeLogicalKeyPositions().length);
-                       
-                       ArrayUtils.reverse(ints);
-                       inInts = Arrays.copyOf(ints, ints.length);
-                       ek = new ExpressionKeys<>(inInts, typeInfo);
-                       Assert.assertArrayEquals(ints, 
ek.computeLogicalKeyPositions());
-                       Assert.assertEquals(ints.length, 
ek.computeLogicalKeyPositions().length);
-               }
-       }
-       
-       @Test 
-       public void testInvalidTuple() throws Throwable {
-               TupleTypeInfo<Tuple3<String, Tuple3<String, String, String>, 
String>> typeInfo = new TupleTypeInfo<>(
-                               BasicTypeInfo.STRING_TYPE_INFO,
-                               new TupleTypeInfo<Tuple3<String, String, 
String>>(BasicTypeInfo.STRING_TYPE_INFO,BasicTypeInfo.STRING_TYPE_INFO,BasicTypeInfo.STRING_TYPE_INFO),
-                               BasicTypeInfo.STRING_TYPE_INFO);
-               
-               String[][] tests = new String[][] {
-                               new String[] {"f0.f1"}, // nesting into unnested
-                               new String[] {"f11"},
-                               new String[] {"f-35"},
-                               new String[] {"f0.f33"},
-                               new String[] {"f1.f33"}
-               };
-               for (String[] test : tests) {
-                       Throwable e = null;
-                       try {
-                               new ExpressionKeys<>(test, typeInfo);
-                       } catch (Throwable t) {
-                               e = t;
-                       }
-                       Assert.assertNotNull(e);
-               }
-       }
-
-       @Test(expected = InvalidProgramException.class)
-       public void testTupleNonKeyField() {
-               // selected field is not a key type
-               TypeInformation<Tuple3<String, Long, GenericNonKeyType>> ti = 
new TupleTypeInfo<>(
-                       BasicTypeInfo.STRING_TYPE_INFO,
-                       BasicTypeInfo.LONG_TYPE_INFO,
-                       TypeExtractor.getForClass(GenericNonKeyType.class)
-               );
-
-               new ExpressionKeys<>(2, ti);
-       }
-       
-       @Test
-       public void testTupleKeyExpansion() {
-               TupleTypeInfo<Tuple3<String, Tuple3<String, String, String>, 
String>> typeInfo = new TupleTypeInfo<>(
-                               BasicTypeInfo.STRING_TYPE_INFO,
-                               new TupleTypeInfo<Tuple3<String, String, 
String>>(BasicTypeInfo.STRING_TYPE_INFO,BasicTypeInfo.STRING_TYPE_INFO,BasicTypeInfo.STRING_TYPE_INFO),
-                               BasicTypeInfo.STRING_TYPE_INFO);
-               ExpressionKeys<Tuple3<String, Tuple3<String, String, String>, 
String>> fpk = 
-                               new ExpressionKeys<>(0, typeInfo);
-               Assert.assertArrayEquals(new int[] {0}, 
fpk.computeLogicalKeyPositions());
-               
-               fpk = new ExpressionKeys<>(1, typeInfo);
-               Assert.assertArrayEquals(new int[] {1,2,3}, 
fpk.computeLogicalKeyPositions());
-               
-               fpk = new ExpressionKeys<>(2, typeInfo);
-               Assert.assertArrayEquals(new int[] {4}, 
fpk.computeLogicalKeyPositions());
-               
-               fpk = new ExpressionKeys<>(new int[] {0,1,2}, typeInfo);
-               Assert.assertArrayEquals(new int[] {0,1,2,3,4}, 
fpk.computeLogicalKeyPositions());
-               
-               fpk = new ExpressionKeys<>(null, typeInfo, true); // empty case
-               Assert.assertArrayEquals(new int[] {0,1,2,3,4}, 
fpk.computeLogicalKeyPositions());
-               
-               fpk = new ExpressionKeys<>("*", typeInfo);
-               Assert.assertArrayEquals(new int[] {0,1,2,3,4}, 
fpk.computeLogicalKeyPositions());
-               
-               // scala style "select all"
-               fpk = new ExpressionKeys<>("_", typeInfo);
-               Assert.assertArrayEquals(new int[] {0,1,2,3,4}, 
fpk.computeLogicalKeyPositions());
-               
-               // this was a bug:
-               fpk = new ExpressionKeys<>("f2", typeInfo);
-               Assert.assertArrayEquals(new int[] {4}, 
fpk.computeLogicalKeyPositions());
-               
-               fpk = new ExpressionKeys<>(new String[] {"f0","f1.f0","f1.f1", 
"f1.f2", "f2"}, typeInfo);
-               Assert.assertArrayEquals(new int[] {0,1,2,3,4}, 
fpk.computeLogicalKeyPositions());
-               
-               fpk = new ExpressionKeys<>(new String[] {"f0","f1.f0","f1.f1", 
"f2"}, typeInfo);
-               Assert.assertArrayEquals(new int[] {0,1,2,4}, 
fpk.computeLogicalKeyPositions());
-               
-               fpk = new ExpressionKeys<>(new String[] {"f2", "f0"}, typeInfo);
-               Assert.assertArrayEquals(new int[] {4,0}, 
fpk.computeLogicalKeyPositions());
-               
-
-               TupleTypeInfo<Tuple3<String, Tuple3<Tuple3<String, String, 
String>, String, String>, String>> complexTypeInfo = new TupleTypeInfo<>(
-                               BasicTypeInfo.STRING_TYPE_INFO,
-                               new TupleTypeInfo<Tuple3<Tuple3<String, String, 
String>, String, String>>(new TupleTypeInfo<Tuple3<String, String, 
String>>(BasicTypeInfo.STRING_TYPE_INFO,BasicTypeInfo.STRING_TYPE_INFO,BasicTypeInfo.STRING_TYPE_INFO),BasicTypeInfo.STRING_TYPE_INFO,BasicTypeInfo.STRING_TYPE_INFO),
-                               BasicTypeInfo.STRING_TYPE_INFO);
-               
-               ExpressionKeys<Tuple3<String, Tuple3<Tuple3<String, String, 
String>, String, String>, String>> complexFpk = 
-               new ExpressionKeys<>(0, complexTypeInfo);
-               Assert.assertArrayEquals(new int[] {0}, 
complexFpk.computeLogicalKeyPositions());
-               
-               complexFpk = new ExpressionKeys<>(new int[] {0,1,2}, 
complexTypeInfo);
-               Assert.assertArrayEquals(new int[] {0,1,2,3,4,5,6}, 
complexFpk.computeLogicalKeyPositions());
-               
-               complexFpk = new ExpressionKeys<>("*", complexTypeInfo);
-               Assert.assertArrayEquals(new int[] {0,1,2,3,4,5,6}, 
complexFpk.computeLogicalKeyPositions());
-               
-               // scala style select all
-               complexFpk = new ExpressionKeys<>("_", complexTypeInfo);
-               Assert.assertArrayEquals(new int[] {0,1,2,3,4,5,6}, 
complexFpk.computeLogicalKeyPositions());
-               
-               complexFpk = new ExpressionKeys<>("f1.f0.*", complexTypeInfo);
-               Assert.assertArrayEquals(new int[] {1,2,3}, 
complexFpk.computeLogicalKeyPositions());
-
-               complexFpk = new ExpressionKeys<>("f1.f0", complexTypeInfo);
-               Assert.assertArrayEquals(new int[] {1,2,3}, 
complexFpk.computeLogicalKeyPositions());
-               
-               complexFpk = new ExpressionKeys<>("f2", complexTypeInfo);
-               Assert.assertArrayEquals(new int[] {6}, 
complexFpk.computeLogicalKeyPositions());
-       }
-
-       @Test
-       public void testPojoKeys() {
-               TypeInformation<PojoWithMultiplePojos> ti = 
TypeExtractor.getForClass(PojoWithMultiplePojos.class);
-               ExpressionKeys<PojoWithMultiplePojos> ek;
-               ek = new ExpressionKeys<>("*", ti);
-               Assert.assertArrayEquals(new int[] {0,1,2,3,4}, 
ek.computeLogicalKeyPositions());
-
-               ek = new ExpressionKeys<>("p1.*", ti);
-               Assert.assertArrayEquals(new int[] {1,2}, 
ek.computeLogicalKeyPositions());
-
-               ek = new ExpressionKeys<>("p2.*", ti);
-               Assert.assertArrayEquals(new int[] {3,4}, 
ek.computeLogicalKeyPositions());
-
-               ek = new ExpressionKeys<>("p1", ti);
-               Assert.assertArrayEquals(new int[] {1,2}, 
ek.computeLogicalKeyPositions());
-
-               ek = new ExpressionKeys<>("p2", ti);
-               Assert.assertArrayEquals(new int[] {3,4}, 
ek.computeLogicalKeyPositions());
-
-               ek = new ExpressionKeys<>("i0", ti);
-               Assert.assertArrayEquals(new int[] {0}, 
ek.computeLogicalKeyPositions());
-       }
-
-       @Test
-       public void testTupleWithNestedPojo() {
-
-               TypeInformation<Tuple3<Integer, Pojo1, PojoWithMultiplePojos>> 
ti =
-                               new TupleTypeInfo<>(
-                                       BasicTypeInfo.INT_TYPE_INFO,
-                                       TypeExtractor.getForClass(Pojo1.class),
-                                       
TypeExtractor.getForClass(PojoWithMultiplePojos.class)
-                               );
-
-               ExpressionKeys<Tuple3<Integer, Pojo1, PojoWithMultiplePojos>> 
ek;
-
-               ek = new ExpressionKeys<>(0, ti);
-               Assert.assertArrayEquals(new int[] {0}, 
ek.computeLogicalKeyPositions());
-
-               ek = new ExpressionKeys<>(1, ti);
-               Assert.assertArrayEquals(new int[] {1,2}, 
ek.computeLogicalKeyPositions());
-
-               ek = new ExpressionKeys<>(2, ti);
-               Assert.assertArrayEquals(new int[] {3,4,5,6,7}, 
ek.computeLogicalKeyPositions());
-
-               ek = new ExpressionKeys<>(new int[]{}, ti, true);
-               Assert.assertArrayEquals(new int[] {0,1,2,3,4,5,6,7}, 
ek.computeLogicalKeyPositions());
-
-               ek = new ExpressionKeys<>("*", ti);
-               Assert.assertArrayEquals(new int[] {0,1,2,3,4,5,6,7}, 
ek.computeLogicalKeyPositions());
-
-       }
-
-       @Test(expected = InvalidProgramException.class)
-       public void testNonKeyPojoField() {
-               // selected field is not a key type
-               TypeInformation<PojoWithNonKeyField> ti = 
TypeExtractor.getForClass(PojoWithNonKeyField.class);
-               new ExpressionKeys<>("b", ti);
-       }
-
-       @Test
-       public void testInvalidPojo() throws Throwable {
-               TypeInformation<ComplexNestedClass> ti = 
TypeExtractor.getForClass(ComplexNestedClass.class);
-
-               String[][] tests = new String[][] {
-                       new String[] {"nonexistent"},
-                       new String[] {"date.abc"} // nesting into unnested
-               };
-               for (String[] test : tests) {
-                       Throwable e = null;
-                       try {
-                               new ExpressionKeys<>(test, ti);
-                       } catch (Throwable t) {
-                               e = t;
-                       }
-                       Assert.assertNotNull(e);
-               }
-       }
-
-       @Test
-       public void testAreCompatible1() throws Keys.IncompatibleKeysException {
-               TypeInformation<Pojo1> t1 = 
TypeExtractor.getForClass(Pojo1.class);
-
-               ExpressionKeys<Pojo1> ek1 = new ExpressionKeys<>("a", t1);
-               ExpressionKeys<Pojo1> ek2 = new ExpressionKeys<>("b", t1);
-
-               Assert.assertTrue(ek1.areCompatible(ek2));
-               Assert.assertTrue(ek2.areCompatible(ek1));
-       }
-
-       @Test
-       public void testAreCompatible2() throws Keys.IncompatibleKeysException {
-               TypeInformation<Pojo1> t1 = 
TypeExtractor.getForClass(Pojo1.class);
-               TypeInformation<Tuple2<String, Long>> t2 = new TupleTypeInfo<>(
-                       BasicTypeInfo.STRING_TYPE_INFO,
-                       BasicTypeInfo.LONG_TYPE_INFO
-               );
-
-               ExpressionKeys<Pojo1> ek1 = new ExpressionKeys<>("a", t1);
-               ExpressionKeys<Tuple2<String, Long>> ek2 = new 
ExpressionKeys<>(0, t2);
-
-               Assert.assertTrue(ek1.areCompatible(ek2));
-               Assert.assertTrue(ek2.areCompatible(ek1));
-       }
-
-       @Test
-       public void testAreCompatible3() throws Keys.IncompatibleKeysException {
-               TypeInformation<String> t1 = BasicTypeInfo.STRING_TYPE_INFO;
-               TypeInformation<Tuple2<String, Long>> t2 = new TupleTypeInfo<>(
-                       BasicTypeInfo.STRING_TYPE_INFO,
-                       BasicTypeInfo.LONG_TYPE_INFO
-               );
-
-               ExpressionKeys<String> ek1 = new ExpressionKeys<>("*", t1);
-               ExpressionKeys<Tuple2<String, Long>> ek2 = new 
ExpressionKeys<>(0, t2);
-
-               Assert.assertTrue(ek1.areCompatible(ek2));
-               Assert.assertTrue(ek2.areCompatible(ek1));
-       }
-
-       @Test
-       public void testAreCompatible4() throws Keys.IncompatibleKeysException {
-               TypeInformation<PojoWithMultiplePojos> t1 = 
TypeExtractor.getForClass(PojoWithMultiplePojos.class);
-               TypeInformation<Tuple3<String, Long, Integer>> t2 = new 
TupleTypeInfo<>(
-                       BasicTypeInfo.STRING_TYPE_INFO,
-                       BasicTypeInfo.LONG_TYPE_INFO,
-                       BasicTypeInfo.INT_TYPE_INFO
-               );
-
-               ExpressionKeys<PojoWithMultiplePojos> ek1 = new 
ExpressionKeys<>(new String[]{"p1", "i0"}, t1);
-               ExpressionKeys<Tuple3<String, Long, Integer>> ek2 = new 
ExpressionKeys<>(new int[]{0, 0, 2}, t2);
-
-               Assert.assertTrue(ek1.areCompatible(ek2));
-               Assert.assertTrue(ek2.areCompatible(ek1));
-       }
-
-       @Test
-       public void testAreCompatible5() throws Keys.IncompatibleKeysException {
-               TypeInformation<PojoWithMultiplePojos> t1 = 
TypeExtractor.getForClass(PojoWithMultiplePojos.class);
-               TypeInformation<Tuple2<String, String>> t2 = new 
TupleTypeInfo<>(
-                       BasicTypeInfo.STRING_TYPE_INFO,
-                       BasicTypeInfo.STRING_TYPE_INFO
-               );
-
-               ExpressionKeys<PojoWithMultiplePojos> ek1 = new 
ExpressionKeys<>(new String[]{"p1.b", "p2.a2"}, t1);
-               ExpressionKeys<Tuple2<String, String>> ek2 = new 
ExpressionKeys<>("*", t2);
-
-               Assert.assertTrue(ek1.areCompatible(ek2));
-               Assert.assertTrue(ek2.areCompatible(ek1));
-       }
-
-       @Test(expected = Keys.IncompatibleKeysException.class)
-       public void testAreCompatible6() throws Keys.IncompatibleKeysException {
-               TypeInformation<Pojo1> t1 = 
TypeExtractor.getForClass(Pojo1.class);
-               TypeInformation<Tuple2<String, Long>> t2 = new TupleTypeInfo<>(
-                       BasicTypeInfo.STRING_TYPE_INFO,
-                       BasicTypeInfo.LONG_TYPE_INFO
-               );
-
-               ExpressionKeys<Pojo1> ek1 = new ExpressionKeys<>("a", t1);
-               ExpressionKeys<Tuple2<String, Long>> ek2 = new 
ExpressionKeys<>(1, t2);
-
-               ek1.areCompatible(ek2);
-       }
-
-       @Test(expected = Keys.IncompatibleKeysException.class)
-       public void testAreCompatible7() throws Keys.IncompatibleKeysException {
-               TypeInformation<Pojo1> t1 = 
TypeExtractor.getForClass(Pojo1.class);
-               TypeInformation<Tuple2<String, Long>> t2 = new TupleTypeInfo<>(
-                       BasicTypeInfo.STRING_TYPE_INFO,
-                       BasicTypeInfo.LONG_TYPE_INFO
-               );
-
-               ExpressionKeys<Pojo1> ek1 = new ExpressionKeys<>(new 
String[]{"a", "b"}, t1);
-               ExpressionKeys<Tuple2<String, Long>> ek2 = new 
ExpressionKeys<>(0, t2);
-
-               ek1.areCompatible(ek2);
-       }
-
-       @Test
-       public void testAreCompatible8() throws Keys.IncompatibleKeysException {
-               TypeInformation<String> t1 = BasicTypeInfo.STRING_TYPE_INFO;
-               TypeInformation<Pojo2> t2 = 
TypeExtractor.getForClass(Pojo2.class);
-
-               ExpressionKeys<String> ek1 = new ExpressionKeys<>("*", t1);
-               Keys<Pojo2> ek2 = new Keys.SelectorFunctionKeys<>(
-                               new KeySelector1(),
-                               t2,
-                               BasicTypeInfo.STRING_TYPE_INFO
-                       );
-
-               Assert.assertTrue(ek1.areCompatible(ek2));
-       }
-
-       @Test
-       public void testAreCompatible9() throws Keys.IncompatibleKeysException {
-               TypeInformation<Tuple3<String, Long, Integer>> t1 = new 
TupleTypeInfo<>(
-                       BasicTypeInfo.STRING_TYPE_INFO,
-                       BasicTypeInfo.LONG_TYPE_INFO,
-                       BasicTypeInfo.INT_TYPE_INFO
-               );
-               TypeInformation<PojoWithMultiplePojos> t2 = 
TypeExtractor.getForClass(PojoWithMultiplePojos.class);
-
-               ExpressionKeys<Tuple3<String, Long, Integer>> ek1 = new 
ExpressionKeys<>(new int[]{2,0}, t1);
-               Keys<PojoWithMultiplePojos> ek2 = new 
Keys.SelectorFunctionKeys<>(
-                       new KeySelector3(),
-                       t2,
-                       new TupleTypeInfo<Tuple2<Integer, 
String>>(BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO)
-               );
-
-               Assert.assertTrue(ek1.areCompatible(ek2));
-       }
-
-       public static class Pojo1 {
-               public String a;
-               public String b;
-       }
-
-       public static class Pojo2 {
-               public String a2;
-               public String b2;
-       }
-
-       public static class PojoWithMultiplePojos {
-               public Pojo1 p1;
-               public Pojo2 p2;
-               public Integer i0;
-       }
-
-       public static class PojoWithNonKeyField {
-               public String a;
-               public GenericNonKeyType b;
-       }
-
-       public static class GenericNonKeyType {
-               private String a;
-               private String b;
-       }
-
-       public static class GenericKeyType implements 
Comparable<GenericNonKeyType> {
-               private String a;
-               private String b;
-
-               @Override
-               public int compareTo(GenericNonKeyType o) {
-                       return 0;
-               }
-       }
-
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/test/java/org/apache/flink/api/java/operators/NamesTest.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/test/java/org/apache/flink/api/java/operators/NamesTest.java 
b/flink-java/src/test/java/org/apache/flink/api/java/operators/NamesTest.java
index 5c2c7e9..09a705c 100644
--- 
a/flink-java/src/test/java/org/apache/flink/api/java/operators/NamesTest.java
+++ 
b/flink-java/src/test/java/org/apache/flink/api/java/operators/NamesTest.java
@@ -48,7 +48,7 @@ public class NamesTest implements Serializable {
        public void testDefaultName() {
                ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
 
-               DataSet<String> strs = env.fromCollection(Arrays.asList( new 
String[] {"a", "b"}));
+               DataSet<String> strs = env.fromCollection(Arrays.asList("a", 
"b"));
 
 
                // WARNING: The test will fail if this line is being moved down 
in the file (the line-number is hard-coded)
@@ -68,7 +68,7 @@ public class NamesTest implements Serializable {
        public void testGivenName() {
                ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
 
-               DataSet<String> strs = env.fromCollection(Arrays.asList( new 
String[] {"a", "b"}));
+               DataSet<String> strs = env.fromCollection(Arrays.asList("a", 
"b"));
                strs.filter(new FilterFunction<String>() {
                        private static final long serialVersionUID = 1L;
                        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/test/java/org/apache/flink/api/java/operators/SelectorFunctionKeysTest.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/test/java/org/apache/flink/api/java/operators/SelectorFunctionKeysTest.java
 
b/flink-java/src/test/java/org/apache/flink/api/java/operators/SelectorFunctionKeysTest.java
deleted file mode 100644
index f59af6e..0000000
--- 
a/flink-java/src/test/java/org/apache/flink/api/java/operators/SelectorFunctionKeysTest.java
+++ /dev/null
@@ -1,153 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.operators;
-
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.api.java.typeutils.TupleTypeInfo;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
-import 
org.apache.flink.api.java.operators.ExpressionKeysTest.PojoWithMultiplePojos;
-import org.apache.flink.api.java.operators.ExpressionKeysTest.Pojo1;
-import org.apache.flink.api.java.operators.ExpressionKeysTest.Pojo2;
-import org.junit.Assert;
-import org.junit.Test;
-
-public class SelectorFunctionKeysTest {
-
-       @Test
-       public void testAreCompatible1() throws Keys.IncompatibleKeysException {
-               TypeInformation<Pojo2> t1 = 
TypeExtractor.getForClass(Pojo2.class);
-               TypeInformation<Tuple2<Integer, String>> t2 =
-                       new TupleTypeInfo<>(BasicTypeInfo.INT_TYPE_INFO, 
BasicTypeInfo.STRING_TYPE_INFO);
-
-               Keys<Pojo2> k1 = new Keys.SelectorFunctionKeys<>(
-                       new KeySelector1(),
-                       t1,
-                       BasicTypeInfo.STRING_TYPE_INFO
-               );
-               Keys<Tuple2<Integer, String>> k2 = new 
Keys.SelectorFunctionKeys<>(
-                       new KeySelector2(),
-                       t2,
-                       BasicTypeInfo.STRING_TYPE_INFO
-               );
-
-               Assert.assertTrue(k1.areCompatible(k2));
-               Assert.assertTrue(k2.areCompatible(k1));
-       }
-
-       @Test
-       public void testAreCompatible2() throws Keys.IncompatibleKeysException {
-               TypeInformation<PojoWithMultiplePojos> t1 = 
TypeExtractor.getForClass(PojoWithMultiplePojos.class);
-               TypeInformation<Tuple3<Long, Pojo1, Integer>> t2 = new 
TupleTypeInfo<>(
-                       BasicTypeInfo.LONG_TYPE_INFO,
-                       TypeExtractor.getForClass(Pojo1.class),
-                       BasicTypeInfo.INT_TYPE_INFO);
-               TypeInformation<Tuple2<Integer, String>> kt = new 
TupleTypeInfo<>(
-                       BasicTypeInfo.INT_TYPE_INFO,
-                       BasicTypeInfo.STRING_TYPE_INFO
-               );
-
-               Keys<PojoWithMultiplePojos> k1 = new 
Keys.SelectorFunctionKeys<>(
-                       new KeySelector3(),
-                       t1,
-                       kt
-               );
-               Keys<Tuple3<Long, Pojo1, Integer>> k2 = new 
Keys.SelectorFunctionKeys<>(
-                       new KeySelector4(),
-                       t2,
-                       kt
-               );
-
-               Assert.assertTrue(k1.areCompatible(k2));
-               Assert.assertTrue(k2.areCompatible(k1));
-       }
-
-       @Test
-       public void testAreCompatible3() throws Keys.IncompatibleKeysException {
-               TypeInformation<String> t1 = BasicTypeInfo.STRING_TYPE_INFO;
-               TypeInformation<Pojo2> t2 = 
TypeExtractor.getForClass(Pojo2.class);
-
-               Keys.ExpressionKeys<String> ek1 = new 
Keys.ExpressionKeys<>("*", t1);
-               Keys<Pojo2> sk2 = new Keys.SelectorFunctionKeys<>(
-                       new KeySelector1(),
-                       t2,
-                       BasicTypeInfo.STRING_TYPE_INFO
-               );
-
-               Assert.assertTrue(sk2.areCompatible(ek1));
-       }
-
-       @Test
-       public void testAreCompatible4() throws Keys.IncompatibleKeysException {
-               TypeInformation<Tuple3<String, Long, Integer>> t1 = new 
TupleTypeInfo<>(
-                       BasicTypeInfo.STRING_TYPE_INFO,
-                       BasicTypeInfo.LONG_TYPE_INFO,
-                       BasicTypeInfo.INT_TYPE_INFO
-               );
-               TypeInformation<PojoWithMultiplePojos> t2 = 
TypeExtractor.getForClass(PojoWithMultiplePojos.class);
-
-               Keys.ExpressionKeys<Tuple3<String, Long, Integer>> ek1 = new 
Keys.ExpressionKeys<>(new int[]{2,0}, t1);
-               Keys<PojoWithMultiplePojos> sk2 = new 
Keys.SelectorFunctionKeys<>(
-                       new KeySelector3(),
-                       t2,
-                       new TupleTypeInfo<Tuple2<Integer, 
String>>(BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO)
-               );
-
-               Assert.assertTrue(sk2.areCompatible(ek1));
-       }
-
-       @SuppressWarnings("serial")
-       public static class KeySelector1 implements KeySelector<Pojo2, String> {
-
-               @Override
-               public String getKey(Pojo2 v) throws Exception {
-                       return v.b2;
-               }
-       }
-
-       @SuppressWarnings("serial")
-       public static class KeySelector2 implements KeySelector<Tuple2<Integer, 
String>, String> {
-
-               @Override
-               public String getKey(Tuple2<Integer, String> v) throws 
Exception {
-                       return v.f1;
-               }
-       }
-
-       @SuppressWarnings("serial")
-       public static class KeySelector3 implements 
KeySelector<PojoWithMultiplePojos, Tuple2<Integer, String>> {
-
-               @Override
-               public Tuple2<Integer, String> getKey(PojoWithMultiplePojos v) 
throws Exception {
-                       return new Tuple2<>(v.i0, v.p1.b);
-               }
-       }
-
-       @SuppressWarnings("serial")
-       public static class KeySelector4 implements KeySelector<Tuple3<Long, 
Pojo1, Integer>, Tuple2<Integer, String>> {
-
-               @Override
-               public Tuple2<Integer, String> getKey(Tuple3<Long, Pojo1, 
Integer> v) throws Exception {
-                       return new Tuple2<>(v.f2, v.f1.a);
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/test/java/org/apache/flink/api/java/sca/UdfAnalyzerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/test/java/org/apache/flink/api/java/sca/UdfAnalyzerTest.java 
b/flink-java/src/test/java/org/apache/flink/api/java/sca/UdfAnalyzerTest.java
index 8d8f801..a1ccfe4 100644
--- 
a/flink-java/src/test/java/org/apache/flink/api/java/sca/UdfAnalyzerTest.java
+++ 
b/flink-java/src/test/java/org/apache/flink/api/java/sca/UdfAnalyzerTest.java
@@ -35,7 +35,7 @@ import 
org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields;
 import 
org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsFirst;
 import 
org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsSecond;
 import org.apache.flink.api.java.functions.SemanticPropUtil;
-import org.apache.flink.api.java.operators.Keys;
+import org.apache.flink.api.common.operators.Keys;
 import org.apache.flink.api.java.tuple.Tuple1;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple3;

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/test/java/org/apache/flink/api/java/tuple/Tuple2Test.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/test/java/org/apache/flink/api/java/tuple/Tuple2Test.java 
b/flink-java/src/test/java/org/apache/flink/api/java/tuple/Tuple2Test.java
deleted file mode 100644
index 0cc5dcf..0000000
--- a/flink-java/src/test/java/org/apache/flink/api/java/tuple/Tuple2Test.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.api.java.tuple;
-
-import org.apache.flink.types.NullFieldException;
-import org.junit.Assert;
-import org.junit.Test;
-
-public class Tuple2Test {
-
-       @Test
-       public void testSwapValues() {
-               Tuple2<String, Integer> toSwap = new Tuple2<>("Test case", 25);
-               Tuple2<Integer, String> swapped = toSwap.swap();
-
-               Assert.assertEquals(swapped.f0, toSwap.f1);
-
-               Assert.assertEquals(swapped.f1, toSwap.f0);
-       }
-       
-       @Test(expected = NullFieldException.class)
-       public void testGetFieldNotNull() {
-               Tuple2<String, Integer> tuple = new Tuple2<>("Test case", null);
-
-               Assert.assertEquals("Test case", tuple.getFieldNotNull(0));
-               tuple.getFieldNotNull(1);
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/PojoTypeExtractionTest.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/PojoTypeExtractionTest.java
 
b/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/PojoTypeExtractionTest.java
deleted file mode 100644
index 96ba264..0000000
--- 
a/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/PojoTypeExtractionTest.java
+++ /dev/null
@@ -1,876 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.java.type.extractor;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Date;
-import java.util.List;
-
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeutils.CompositeType.FlatFieldDescriptor;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.tuple.Tuple1;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.api.java.typeutils.GenericTypeInfo;
-import org.apache.flink.api.java.typeutils.PojoField;
-import org.apache.flink.api.java.typeutils.PojoTypeInfo;
-import org.apache.flink.api.java.typeutils.TupleTypeInfo;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.api.java.typeutils.TypeInfoParser;
-import org.apache.flink.api.java.typeutils.TypeInfoParserTest.MyWritable;
-import org.apache.flink.api.java.typeutils.WritableTypeInfo;
-import org.junit.Assert;
-import org.junit.Test;
-
-import com.google.common.collect.HashMultiset;
-
-/**
- *  Pojo Type tests
- *
- *  A Pojo is a bean-style class with getters, setters and empty ctor
- *   OR a class with all fields public (or for every private field, there has 
to be a public getter/setter)
- *   everything else is a generic type (that can't be used for field selection)
- */
-public class PojoTypeExtractionTest {
-
-       public static class HasDuplicateField extends WC {
-               @SuppressWarnings("unused")
-               private int count; // duplicate
-       }
-
-       @Test(expected=RuntimeException.class)
-       public void testDuplicateFieldException() {
-               TypeExtractor.createTypeInfo(HasDuplicateField.class);
-       }
-
-       // test with correct pojo types
-       public static class WC { // is a pojo
-               public ComplexNestedClass complex; // is a pojo
-               private int count; // is a BasicType
-
-               public WC() {
-               }
-               public int getCount() {
-                       return count;
-               }
-               public void setCount(int c) {
-                       this.count = c;
-               }
-       }
-       public static class ComplexNestedClass { // pojo
-               public static int ignoreStaticField;
-               public transient int ignoreTransientField;
-               public Date date; // generic type
-               public Integer someNumberWithÜnicödeNäme; // BasicType
-               public float someFloat; // BasicType
-               public Tuple3<Long, Long, String> word; //Tuple Type with three 
basic types
-               public Object nothing; // generic type
-               public MyWritable hadoopCitizen;  // writableType
-               public List<String> collection;
-       }
-
-       // all public test
-       public static class AllPublic extends ComplexNestedClass {
-               public ArrayList<String> somethingFancy; // generic type
-               public HashMultiset<Integer> fancyIds; // generic type
-               public String[] fancyArray;                      // generic type
-       }
-
-       public static class ParentSettingGenerics extends 
PojoWithGenerics<Integer, Long> {
-               public String field3;
-       }
-       public static class PojoWithGenerics<T1, T2> {
-               public int key;
-               public T1 field1;
-               public T2 field2;
-       }
-
-       public static class ComplexHierarchyTop extends 
ComplexHierarchy<Tuple1<String>> {}
-       public static class ComplexHierarchy<T> extends 
PojoWithGenerics<FromTuple,T> {}
-
-       // extends from Tuple and adds a field
-       public static class FromTuple extends Tuple3<String, String, Long> {
-               private static final long serialVersionUID = 1L;
-               public int special;
-       }
-
-       public static class IncorrectPojo {
-               private int isPrivate;
-               public int getIsPrivate() {
-                       return isPrivate;
-               }
-               // setter is missing (intentional)
-       }
-
-       // correct pojo
-       public static class BeanStylePojo {
-               public String abc;
-               private int field;
-               public int getField() {
-                       return this.field;
-               }
-               public void setField(int f) {
-                       this.field = f;
-               }
-       }
-       public static class WrongCtorPojo {
-               public int a;
-               public WrongCtorPojo(int a) {
-                       this.a = a;
-               }
-       }
-
-       public static class PojoWithGenericFields {
-               private Collection<String> users;
-               private boolean favorited;
-
-               public boolean isFavorited() {
-                       return favorited;
-               }
-
-               public void setFavorited(boolean favorited) {
-                       this.favorited = favorited;
-               }
-
-               public Collection<String> getUsers() {
-                       return users;
-               }
-
-               public void setUsers(Collection<String> users) {
-                       this.users = users;
-               }
-       }
-       @Test
-       public void testPojoWithGenericFields() {
-               TypeInformation<?> typeForClass = 
TypeExtractor.createTypeInfo(PojoWithGenericFields.class);
-
-               Assert.assertTrue(typeForClass instanceof PojoTypeInfo<?>);
-       }
-
-
-       // in this test, the location of the getters and setters is mixed 
across the type hierarchy.
-       public static class TypedPojoGetterSetterCheck extends 
GenericPojoGetterSetterCheck<String> {
-               public void setPackageProtected(String in) {
-                       this.packageProtected = in;
-               }
-       }
-       public static class GenericPojoGetterSetterCheck<T> {
-               T packageProtected;
-               public T getPackageProtected() {
-                       return packageProtected;
-               }
-       }
-
-       @Test
-       public void testIncorrectPojos() {
-               TypeInformation<?> typeForClass = 
TypeExtractor.createTypeInfo(IncorrectPojo.class);
-               Assert.assertTrue(typeForClass instanceof GenericTypeInfo<?>);
-
-               typeForClass = 
TypeExtractor.createTypeInfo(WrongCtorPojo.class);
-               Assert.assertTrue(typeForClass instanceof GenericTypeInfo<?>);
-       }
-
-       @Test
-       public void testCorrectPojos() {
-               TypeInformation<?> typeForClass = 
TypeExtractor.createTypeInfo(BeanStylePojo.class);
-               Assert.assertTrue(typeForClass instanceof PojoTypeInfo<?>);
-
-               typeForClass = 
TypeExtractor.createTypeInfo(TypedPojoGetterSetterCheck.class);
-               Assert.assertTrue(typeForClass instanceof PojoTypeInfo<?>);
-       }
-
-       @Test
-       public void testPojoWC() {
-               TypeInformation<?> typeForClass = 
TypeExtractor.createTypeInfo(WC.class);
-               checkWCPojoAsserts(typeForClass);
-
-               WC t = new WC();
-               t.complex = new ComplexNestedClass();
-               TypeInformation<?> typeForObject = 
TypeExtractor.getForObject(t);
-               checkWCPojoAsserts(typeForObject);
-       }
-
-       @SuppressWarnings({ "unchecked", "rawtypes" })
-       private void checkWCPojoAsserts(TypeInformation<?> typeInfo) {
-               Assert.assertFalse(typeInfo.isBasicType());
-               Assert.assertFalse(typeInfo.isTupleType());
-               Assert.assertEquals(10, typeInfo.getTotalFields());
-               Assert.assertTrue(typeInfo instanceof PojoTypeInfo);
-               PojoTypeInfo<?> pojoType = (PojoTypeInfo<?>) typeInfo;
-
-               List<FlatFieldDescriptor> ffd = new 
ArrayList<FlatFieldDescriptor>();
-               String[] fields = {"count",
-                               "complex.date",
-                               "complex.hadoopCitizen",
-                               "complex.collection",
-                               "complex.nothing",
-                               "complex.someFloat",
-                               "complex.someNumberWithÜnicödeNäme",
-                               "complex.word.f0",
-                               "complex.word.f1",
-                               "complex.word.f2"};
-               int[] positions = {9,
-                               1,
-                               2,
-                               0,
-                               3,
-                               4,
-                               5,
-                               6,
-                               7,
-                               8};
-               Assert.assertEquals(fields.length, positions.length);
-               for(int i = 0; i < fields.length; i++) {
-                       pojoType.getFlatFields(fields[i], 0, ffd);
-                       Assert.assertEquals("Too many keys returned", 1, 
ffd.size());
-                       Assert.assertEquals("position of field "+fields[i]+" 
wrong", positions[i], ffd.get(0).getPosition());
-                       ffd.clear();
-               }
-
-               pojoType.getFlatFields("complex.word.*", 0, ffd);
-               Assert.assertEquals(3, ffd.size());
-               // check if it returns 5,6,7
-               for(FlatFieldDescriptor ffdE : ffd) {
-                       final int pos = ffdE.getPosition();
-                       Assert.assertTrue(pos <= 8 );
-                       Assert.assertTrue(6 <= pos );
-                       if(pos == 6) {
-                               Assert.assertEquals(Long.class, 
ffdE.getType().getTypeClass());
-                       }
-                       if(pos == 7) {
-                               Assert.assertEquals(Long.class, 
ffdE.getType().getTypeClass());
-                       }
-                       if(pos == 8) {
-                               Assert.assertEquals(String.class, 
ffdE.getType().getTypeClass());
-                       }
-               }
-               ffd.clear();
-
-               // scala style full tuple selection for pojos
-               pojoType.getFlatFields("complex.word._", 0, ffd);
-               Assert.assertEquals(3, ffd.size());
-               ffd.clear();
-
-               pojoType.getFlatFields("complex.*", 0, ffd);
-               Assert.assertEquals(9, ffd.size());
-               // check if it returns 0-7
-               for(FlatFieldDescriptor ffdE : ffd) {
-                       final int pos = ffdE.getPosition();
-                       Assert.assertTrue(ffdE.getPosition() <= 8 );
-                       Assert.assertTrue(0 <= ffdE.getPosition() );
-
-                       if(pos == 0) {
-                               Assert.assertEquals(List.class, 
ffdE.getType().getTypeClass());
-                       }
-                       if(pos == 1) {
-                               Assert.assertEquals(Date.class, 
ffdE.getType().getTypeClass());
-                       }
-                       if(pos == 2) {
-                               Assert.assertEquals(MyWritable.class, 
ffdE.getType().getTypeClass());
-                       }
-                       if(pos == 3) {
-                               Assert.assertEquals(Object.class, 
ffdE.getType().getTypeClass());
-                       }
-                       if(pos == 4) {
-                               Assert.assertEquals(Float.class, 
ffdE.getType().getTypeClass());
-                       }
-                       if(pos == 5) {
-                               Assert.assertEquals(Integer.class, 
ffdE.getType().getTypeClass());
-                       }
-                       if(pos == 6) {
-                               Assert.assertEquals(Long.class, 
ffdE.getType().getTypeClass());
-                       }
-                       if(pos == 7) {
-                               Assert.assertEquals(Long.class, 
ffdE.getType().getTypeClass());
-                       }
-                       if(pos == 8) {
-                               Assert.assertEquals(String.class, 
ffdE.getType().getTypeClass());
-                       }
-                       if(pos == 9) {
-                               Assert.assertEquals(Integer.class, 
ffdE.getType().getTypeClass());
-                       }
-               }
-               ffd.clear();
-
-               pojoType.getFlatFields("*", 0, ffd);
-               Assert.assertEquals(10, ffd.size());
-               // check if it returns 0-8
-               for(FlatFieldDescriptor ffdE : ffd) {
-                       Assert.assertTrue(ffdE.getPosition() <= 9 );
-                       Assert.assertTrue(0 <= ffdE.getPosition() );
-                       if(ffdE.getPosition() == 9) {
-                               Assert.assertEquals(Integer.class, 
ffdE.getType().getTypeClass());
-                       }
-               }
-               ffd.clear();
-
-               TypeInformation<?> typeComplexNested = pojoType.getTypeAt(0); 
// ComplexNestedClass complex
-               Assert.assertTrue(typeComplexNested instanceof PojoTypeInfo);
-
-               Assert.assertEquals(7, typeComplexNested.getArity());
-               Assert.assertEquals(9, typeComplexNested.getTotalFields());
-               PojoTypeInfo<?> pojoTypeComplexNested = (PojoTypeInfo<?>) 
typeComplexNested;
-
-               boolean dateSeen = false, intSeen = false, floatSeen = false,
-                               tupleSeen = false, objectSeen = false, 
writableSeen = false, collectionSeen = false;
-               for(int i = 0; i < pojoTypeComplexNested.getArity(); i++) {
-                       PojoField field = 
pojoTypeComplexNested.getPojoFieldAt(i);
-                       String name = field.getField().getName();
-                       if(name.equals("date")) {
-                               if(dateSeen) {
-                                       Assert.fail("already seen");
-                               }
-                               dateSeen = true;
-                               
Assert.assertEquals(BasicTypeInfo.DATE_TYPE_INFO, field.getTypeInformation());
-                               Assert.assertEquals(Date.class, 
field.getTypeInformation().getTypeClass());
-                       } else if(name.equals("someNumberWithÜnicödeNäme")) {
-                               if(intSeen) {
-                                       Assert.fail("already seen");
-                               }
-                               intSeen = true;
-                               
Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, field.getTypeInformation());
-                               Assert.assertEquals(Integer.class, 
field.getTypeInformation().getTypeClass());
-                       } else if(name.equals("someFloat")) {
-                               if(floatSeen) {
-                                       Assert.fail("already seen");
-                               }
-                               floatSeen = true;
-                               
Assert.assertEquals(BasicTypeInfo.FLOAT_TYPE_INFO, field.getTypeInformation());
-                               Assert.assertEquals(Float.class, 
field.getTypeInformation().getTypeClass());
-                       } else if(name.equals("word")) {
-                               if(tupleSeen) {
-                                       Assert.fail("already seen");
-                               }
-                               tupleSeen = true;
-                               Assert.assertTrue(field.getTypeInformation() 
instanceof TupleTypeInfo<?>);
-                               Assert.assertEquals(Tuple3.class, 
field.getTypeInformation().getTypeClass());
-                               // do some more advanced checks on the tuple
-                               TupleTypeInfo<?> tupleTypeFromComplexNested = 
(TupleTypeInfo<?>) field.getTypeInformation();
-                               
Assert.assertEquals(BasicTypeInfo.LONG_TYPE_INFO, 
tupleTypeFromComplexNested.getTypeAt(0));
-                               
Assert.assertEquals(BasicTypeInfo.LONG_TYPE_INFO, 
tupleTypeFromComplexNested.getTypeAt(1));
-                               
Assert.assertEquals(BasicTypeInfo.STRING_TYPE_INFO, 
tupleTypeFromComplexNested.getTypeAt(2));
-                       } else if(name.equals("nothing")) {
-                               if(objectSeen) {
-                                       Assert.fail("already seen");
-                               }
-                               objectSeen = true;
-                               Assert.assertEquals(new 
GenericTypeInfo<Object>(Object.class), field.getTypeInformation());
-                               Assert.assertEquals(Object.class, 
field.getTypeInformation().getTypeClass());
-                       } else if(name.equals("hadoopCitizen")) {
-                               if(writableSeen) {
-                                       Assert.fail("already seen");
-                               }
-                               writableSeen = true;
-                               Assert.assertEquals(new 
WritableTypeInfo<MyWritable>(MyWritable.class), field.getTypeInformation());
-                               Assert.assertEquals(MyWritable.class, 
field.getTypeInformation().getTypeClass());
-                       } else if(name.equals("collection")) {
-                               if(collectionSeen) {
-                                       Assert.fail("already seen");
-                               }
-                               collectionSeen = true;
-                               Assert.assertEquals(new 
GenericTypeInfo(List.class), field.getTypeInformation());
-
-                       } else {
-                               Assert.fail("field "+field+" is not expected");
-                       }
-               }
-               Assert.assertTrue("Field was not present", dateSeen);
-               Assert.assertTrue("Field was not present", intSeen);
-               Assert.assertTrue("Field was not present", floatSeen);
-               Assert.assertTrue("Field was not present", tupleSeen);
-               Assert.assertTrue("Field was not present", objectSeen);
-               Assert.assertTrue("Field was not present", writableSeen);
-               Assert.assertTrue("Field was not present", collectionSeen);
-
-               TypeInformation<?> typeAtOne = pojoType.getTypeAt(1); // int 
count
-               Assert.assertTrue(typeAtOne instanceof BasicTypeInfo);
-
-               Assert.assertEquals(typeInfo.getTypeClass(), WC.class);
-               Assert.assertEquals(typeInfo.getArity(), 2);
-       }
-
-       @Test
-       public void testPojoAllPublic() {
-               TypeInformation<?> typeForClass = 
TypeExtractor.createTypeInfo(AllPublic.class);
-               checkAllPublicAsserts(typeForClass);
-
-               TypeInformation<?> typeForObject = 
TypeExtractor.getForObject(new AllPublic() );
-               checkAllPublicAsserts(typeForObject);
-       }
-
-       private void checkAllPublicAsserts(TypeInformation<?> typeInformation) {
-               Assert.assertTrue(typeInformation instanceof PojoTypeInfo);
-               Assert.assertEquals(10, typeInformation.getArity());
-               Assert.assertEquals(12, typeInformation.getTotalFields());
-               // check if the three additional fields are identified correctly
-               boolean arrayListSeen = false, multisetSeen = false, 
strArraySeen = false;
-               PojoTypeInfo<?> pojoTypeForClass = (PojoTypeInfo<?>) 
typeInformation;
-               for(int i = 0; i < pojoTypeForClass.getArity(); i++) {
-                       PojoField field = pojoTypeForClass.getPojoFieldAt(i);
-                       String name = field.getField().getName();
-                       if(name.equals("somethingFancy")) {
-                               if(arrayListSeen) {
-                                       Assert.fail("already seen");
-                               }
-                               arrayListSeen = true;
-                               Assert.assertTrue(field.getTypeInformation() 
instanceof GenericTypeInfo);
-                               Assert.assertEquals(ArrayList.class, 
field.getTypeInformation().getTypeClass());
-                       } else if(name.equals("fancyIds")) {
-                               if(multisetSeen) {
-                                       Assert.fail("already seen");
-                               }
-                               multisetSeen = true;
-                               Assert.assertTrue(field.getTypeInformation() 
instanceof GenericTypeInfo);
-                               Assert.assertEquals(HashMultiset.class, 
field.getTypeInformation().getTypeClass());
-                       } else if(name.equals("fancyArray")) {
-                               if(strArraySeen) {
-                                       Assert.fail("already seen");
-                               }
-                               strArraySeen = true;
-                               
Assert.assertEquals(BasicArrayTypeInfo.STRING_ARRAY_TYPE_INFO, 
field.getTypeInformation());
-                               Assert.assertEquals(String[].class, 
field.getTypeInformation().getTypeClass());
-                       } else if(Arrays.asList("date", 
"someNumberWithÜnicödeNäme", "someFloat", "word", "nothing", 
"hadoopCitizen", "collection").contains(name)) {
-                               // ignore these, they are inherited from the 
ComplexNestedClass
-                       }
-                       else {
-                               Assert.fail("field "+field+" is not expected");
-                       }
-               }
-               Assert.assertTrue("Field was not present", arrayListSeen);
-               Assert.assertTrue("Field was not present", multisetSeen);
-               Assert.assertTrue("Field was not present", strArraySeen);
-       }
-
-       @Test
-       public void testPojoExtendingTuple() {
-               TypeInformation<?> typeForClass = 
TypeExtractor.createTypeInfo(FromTuple.class);
-               checkFromTuplePojo(typeForClass);
-
-               FromTuple ft = new FromTuple();
-               ft.f0 = ""; ft.f1 = ""; ft.f2 = 0L;
-               TypeInformation<?> typeForObject = 
TypeExtractor.getForObject(ft);
-               checkFromTuplePojo(typeForObject);
-       }
-
-       private void checkFromTuplePojo(TypeInformation<?> typeInformation) {
-               Assert.assertTrue(typeInformation instanceof PojoTypeInfo<?>);
-               Assert.assertEquals(4, typeInformation.getTotalFields());
-               PojoTypeInfo<?> pojoTypeForClass = (PojoTypeInfo<?>) 
typeInformation;
-               for(int i = 0; i < pojoTypeForClass.getArity(); i++) {
-                       PojoField field = pojoTypeForClass.getPojoFieldAt(i);
-                       String name = field.getField().getName();
-                       if(name.equals("special")) {
-                               
Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, field.getTypeInformation());
-                       } else if(name.equals("f0") || name.equals("f1")) {
-                               
Assert.assertEquals(BasicTypeInfo.STRING_TYPE_INFO, field.getTypeInformation());
-                       } else if(name.equals("f2")) {
-                               
Assert.assertEquals(BasicTypeInfo.LONG_TYPE_INFO, field.getTypeInformation());
-                       } else {
-                               Assert.fail("unexpected field");
-                       }
-               }
-       }
-
-       @Test
-       public void testPojoWithGenerics() {
-               TypeInformation<?> typeForClass = 
TypeExtractor.createTypeInfo(ParentSettingGenerics.class);
-               Assert.assertTrue(typeForClass instanceof PojoTypeInfo<?>);
-               PojoTypeInfo<?> pojoTypeForClass = (PojoTypeInfo<?>) 
typeForClass;
-               for(int i = 0; i < pojoTypeForClass.getArity(); i++) {
-                       PojoField field = pojoTypeForClass.getPojoFieldAt(i);
-                       String name = field.getField().getName();
-                       if(name.equals("field1")) {
-                               
Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, field.getTypeInformation());
-                       } else if (name.equals("field2")) {
-                               
Assert.assertEquals(BasicTypeInfo.LONG_TYPE_INFO, field.getTypeInformation());
-                       } else if (name.equals("field3")) {
-                               
Assert.assertEquals(BasicTypeInfo.STRING_TYPE_INFO, field.getTypeInformation());
-                       } else if (name.equals("key")) {
-                               
Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, field.getTypeInformation());
-                       } else {
-                               Assert.fail("Unexpected field "+field);
-                       }
-               }
-       }
-
-       /**
-        * Test if the TypeExtractor is accepting untyped generics,
-        * making them GenericTypes
-        */
-       @Test
-       public void testPojoWithGenericsSomeFieldsGeneric() {
-               TypeInformation<?> typeForClass = 
TypeExtractor.createTypeInfo(PojoWithGenerics.class);
-               Assert.assertTrue(typeForClass instanceof PojoTypeInfo<?>);
-               PojoTypeInfo<?> pojoTypeForClass = (PojoTypeInfo<?>) 
typeForClass;
-               for(int i = 0; i < pojoTypeForClass.getArity(); i++) {
-                       PojoField field = pojoTypeForClass.getPojoFieldAt(i);
-                       String name = field.getField().getName();
-                       if(name.equals("field1")) {
-                               Assert.assertEquals(new 
GenericTypeInfo<Object>(Object.class), field.getTypeInformation());
-                       } else if (name.equals("field2")) {
-                               Assert.assertEquals(new 
GenericTypeInfo<Object>(Object.class), field.getTypeInformation());
-                       } else if (name.equals("key")) {
-                               
Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, field.getTypeInformation());
-                       } else {
-                               Assert.fail("Unexpected field "+field);
-                       }
-               }
-       }
-
-
-       @Test
-       public void testPojoWithComplexHierarchy() {
-               TypeInformation<?> typeForClass = 
TypeExtractor.createTypeInfo(ComplexHierarchyTop.class);
-               Assert.assertTrue(typeForClass instanceof PojoTypeInfo<?>);
-               PojoTypeInfo<?> pojoTypeForClass = (PojoTypeInfo<?>) 
typeForClass;
-               for(int i = 0; i < pojoTypeForClass.getArity(); i++) {
-                       PojoField field = pojoTypeForClass.getPojoFieldAt(i);
-                       String name = field.getField().getName();
-                       if(name.equals("field1")) {
-                               Assert.assertTrue(field.getTypeInformation() 
instanceof PojoTypeInfo<?>); // From tuple is pojo (not tuple type!)
-                       } else if (name.equals("field2")) {
-                               Assert.assertTrue(field.getTypeInformation() 
instanceof TupleTypeInfo<?>);
-                               Assert.assertTrue( 
((TupleTypeInfo<?>)field.getTypeInformation()).getTypeAt(0).equals(BasicTypeInfo.STRING_TYPE_INFO)
 );
-                       } else if (name.equals("key")) {
-                               
Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, field.getTypeInformation());
-                       } else {
-                               Assert.fail("Unexpected field "+field);
-                       }
-               }
-       }
-
-
-       public static class Vertex<K, V> {
-
-               private K key1;
-               private K key2;
-               private V value;
-
-               public Vertex() {}
-
-               public Vertex(K key, V value) {
-                       this.key1 = key;
-                       this.key2 = key;
-                       this.value = value;
-               }
-
-               public Vertex(K key1, K key2, V value) {
-                       this.key1 = key1;
-                       this.key2 = key2;
-                       this.value = value;
-               }
-
-               public void setKey1(K key1) {
-                       this.key1 = key1;
-               }
-
-               public void setKey2(K key2) {
-                       this.key2 = key2;
-               }
-
-               public K getKey1() {
-                       return key1;
-               }
-
-               public K getKey2() {
-                       return key2;
-               }
-
-               public void setValue(V value) {
-                       this.value = value;
-               }
-
-               public V getValue() {
-                       return value;
-               }
-       }
-
-       public static class VertexTyped extends Vertex<Long, Double>{
-               public VertexTyped(Long l, Double d) {
-                       super(l, d);
-               }
-               public VertexTyped() {
-               }
-       }
-
-       @Test
-       public void testGetterSetterWithVertex() {
-               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-               env.fromElements(new VertexTyped(0L, 3.0), new VertexTyped(1L, 
1.0));
-       }
-
-       public static class MyMapper<T> implements 
MapFunction<PojoWithGenerics<Long, T>, PojoWithGenerics<T,T>> {
-               private static final long serialVersionUID = 1L;
-
-               @Override
-               public PojoWithGenerics<T, T> map(PojoWithGenerics<Long, T> 
value)
-                               throws Exception {
-                       return null;
-               }
-       }
-
-       @SuppressWarnings({ "unchecked", "rawtypes" })
-       @Test
-       public void testGenericPojoTypeInference1() {
-               MapFunction<?, ?> function = new MyMapper<String>();
-
-               TypeInformation<?> ti = 
TypeExtractor.getMapReturnTypes(function, (TypeInformation)
-                               
TypeInfoParser.parse("org.apache.flink.api.java.type.extractor.PojoTypeExtractionTest$PojoWithGenerics<key=int,field1=Long,field2=String>"));
-               Assert.assertTrue(ti instanceof PojoTypeInfo<?>);
-               PojoTypeInfo<?> pti = (PojoTypeInfo<?>) ti;
-               for(int i = 0; i < pti.getArity(); i++) {
-                       PojoField field = pti.getPojoFieldAt(i);
-                       String name = field.getField().getName();
-                       if(name.equals("field1")) {
-                               
Assert.assertEquals(BasicTypeInfo.STRING_TYPE_INFO, field.getTypeInformation());
-                       } else if (name.equals("field2")) {
-                               
Assert.assertEquals(BasicTypeInfo.STRING_TYPE_INFO, field.getTypeInformation());
-                       } else if (name.equals("key")) {
-                               
Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, field.getTypeInformation());
-                       } else {
-                               Assert.fail("Unexpected field "+field);
-                       }
-               }
-       }
-
-       public static class PojoTuple<A, B, C> extends Tuple3<B, C, Long> {
-               private static final long serialVersionUID = 1L;
-
-               public A extraField;
-       }
-
-       public static class MyMapper2<D, E> implements MapFunction<Tuple2<E, 
D>, PojoTuple<E, D, D>> {
-               private static final long serialVersionUID = 1L;
-
-               @Override
-               public PojoTuple<E, D, D> map(Tuple2<E, D> value) throws 
Exception {
-                       return null;
-               }
-       }
-
-       @SuppressWarnings({ "unchecked", "rawtypes" })
-       @Test
-       public void testGenericPojoTypeInference2() {
-               MapFunction<?, ?> function = new MyMapper2<Boolean, 
Character>();
-
-               TypeInformation<?> ti = 
TypeExtractor.getMapReturnTypes(function, (TypeInformation)
-                               
TypeInfoParser.parse("Tuple2<Character,Boolean>"));
-               Assert.assertTrue(ti instanceof PojoTypeInfo<?>);
-               PojoTypeInfo<?> pti = (PojoTypeInfo<?>) ti;
-               for(int i = 0; i < pti.getArity(); i++) {
-                       PojoField field = pti.getPojoFieldAt(i);
-                       String name = field.getField().getName();
-                       if(name.equals("extraField")) {
-                               
Assert.assertEquals(BasicTypeInfo.CHAR_TYPE_INFO, field.getTypeInformation());
-                       } else if (name.equals("f0")) {
-                               
Assert.assertEquals(BasicTypeInfo.BOOLEAN_TYPE_INFO, 
field.getTypeInformation());
-                       } else if (name.equals("f1")) {
-                               
Assert.assertEquals(BasicTypeInfo.BOOLEAN_TYPE_INFO, 
field.getTypeInformation());
-                       } else if (name.equals("f2")) {
-                               
Assert.assertEquals(BasicTypeInfo.LONG_TYPE_INFO, field.getTypeInformation());
-                       } else {
-                               Assert.fail("Unexpected field "+field);
-                       }
-               }
-       }
-
-       public static class MyMapper3<D, E> implements MapFunction<PojoTuple<E, 
D, D>, Tuple2<E, D>> {
-               private static final long serialVersionUID = 1L;
-
-               @Override
-               public Tuple2<E, D> map(PojoTuple<E, D, D> value) throws 
Exception {
-                       return null;
-               }
-       }
-
-       @SuppressWarnings({ "unchecked", "rawtypes" })
-       @Test
-       public void testGenericPojoTypeInference3() {
-               MapFunction<?, ?> function = new MyMapper3<Boolean, 
Character>();
-
-               TypeInformation<?> ti = 
TypeExtractor.getMapReturnTypes(function, (TypeInformation)
-                               
TypeInfoParser.parse("org.apache.flink.api.java.type.extractor.PojoTypeExtractionTest$PojoTuple<extraField=char,f0=boolean,f1=boolean,f2=long>"));
-               Assert.assertTrue(ti instanceof TupleTypeInfo<?>);
-               TupleTypeInfo<?> tti = (TupleTypeInfo<?>) ti;
-               Assert.assertEquals(BasicTypeInfo.CHAR_TYPE_INFO, 
tti.getTypeAt(0));
-               Assert.assertEquals(BasicTypeInfo.BOOLEAN_TYPE_INFO, 
tti.getTypeAt(1));
-       }
-
-       public static class PojoWithParameterizedFields1<Z> {
-               public Tuple2<Z, Z> field;
-       }
-
-       public static class MyMapper4<A> implements 
MapFunction<PojoWithParameterizedFields1<A>, A> {
-               private static final long serialVersionUID = 1L;
-               @Override
-               public A map(PojoWithParameterizedFields1<A> value) throws 
Exception {
-                       return null;
-               }
-       }
-
-       @SuppressWarnings({ "unchecked", "rawtypes" })
-       @Test
-       public void testGenericPojoTypeInference4() {
-               MapFunction<?, ?> function = new MyMapper4<Byte>();
-
-               TypeInformation<?> ti = 
TypeExtractor.getMapReturnTypes(function, (TypeInformation)
-                               
TypeInfoParser.parse("org.apache.flink.api.java.type.extractor.PojoTypeExtractionTest$PojoWithParameterizedFields1<field=Tuple2<byte,byte>>"));
-               Assert.assertEquals(BasicTypeInfo.BYTE_TYPE_INFO, ti);
-       }
-
-       public static class PojoWithParameterizedFields2<Z> {
-               public PojoWithGenerics<Z, Z> field;
-       }
-
-       public static class MyMapper5<A> implements 
MapFunction<PojoWithParameterizedFields2<A>, A> {
-               private static final long serialVersionUID = 1L;
-               @Override
-               public A map(PojoWithParameterizedFields2<A> value) throws 
Exception {
-                       return null;
-               }
-       }
-
-       @SuppressWarnings({ "unchecked", "rawtypes" })
-       @Test
-       public void testGenericPojoTypeInference5() {
-               MapFunction<?, ?> function = new MyMapper5<Byte>();
-
-               TypeInformation<?> ti = 
TypeExtractor.getMapReturnTypes(function, (TypeInformation)
-                               
TypeInfoParser.parse("org.apache.flink.api.java.type.extractor.PojoTypeExtractionTest$PojoWithParameterizedFields2<"
-                                               + 
"field=org.apache.flink.api.java.type.extractor.PojoTypeExtractionTest$PojoWithGenerics<key=int,field1=byte,field2=byte>"
-                                               + ">"));
-               Assert.assertEquals(BasicTypeInfo.BYTE_TYPE_INFO, ti);
-       }
-       
-       public static class PojoWithParameterizedFields3<Z> {
-               public Z[] field;
-       }
-
-       public static class MyMapper6<A> implements 
MapFunction<PojoWithParameterizedFields3<A>, A> {
-               private static final long serialVersionUID = 1L;
-               @Override
-               public A map(PojoWithParameterizedFields3<A> value) throws 
Exception {
-                       return null;
-               }
-       }
-
-       @SuppressWarnings({ "unchecked", "rawtypes" })
-       @Test
-       public void testGenericPojoTypeInference6() {
-               MapFunction<?, ?> function = new MyMapper6<Integer>();
-
-               TypeInformation<?> ti = 
TypeExtractor.getMapReturnTypes(function, (TypeInformation)
-                               
TypeInfoParser.parse("org.apache.flink.api.java.type.extractor.PojoTypeExtractionTest$PojoWithParameterizedFields3<"
-                                               + "field=int[]"
-                                               + ">"));
-               Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, ti);
-       }
-
-       public static class MyMapper7<A> implements 
MapFunction<PojoWithParameterizedFields4<A>, A> {
-               private static final long serialVersionUID = 1L;
-               @Override
-               public A map(PojoWithParameterizedFields4<A> value) throws 
Exception {
-                       return null;
-               }
-       }
-
-       public static class PojoWithParameterizedFields4<Z> {
-               public Tuple1<Z>[] field;
-       }
-
-       @SuppressWarnings({ "unchecked", "rawtypes" })
-       @Test
-       public void testGenericPojoTypeInference7() {
-               MapFunction<?, ?> function = new MyMapper7<Integer>();
-
-               TypeInformation<?> ti = 
TypeExtractor.getMapReturnTypes(function, (TypeInformation)
-                               
TypeInfoParser.parse("org.apache.flink.api.java.type.extractor.PojoTypeExtractionTest$PojoWithParameterizedFields4<"
-                                               + "field=Tuple1<int>[]"
-                                               + ">"));
-               Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, ti);
-       }
-
-       public static class RecursivePojo1 {
-               public RecursivePojo1 field;
-       }
-
-       public static class RecursivePojo2 {
-               public Tuple1<RecursivePojo2> field;
-       }
-
-       public static class RecursivePojo3 {
-               public NestedPojo field;
-       }
-
-       public static class NestedPojo {
-               public RecursivePojo3 field;
-       }
-
-       @Test
-       public void testRecursivePojo1() {
-               TypeInformation<?> ti = 
TypeExtractor.createTypeInfo(RecursivePojo1.class);
-               Assert.assertTrue(ti instanceof PojoTypeInfo);
-               Assert.assertEquals(GenericTypeInfo.class, ((PojoTypeInfo) 
ti).getPojoFieldAt(0).getTypeInformation().getClass());
-       }
-
-       @Test
-       public void testRecursivePojo2() {
-               TypeInformation<?> ti = 
TypeExtractor.createTypeInfo(RecursivePojo2.class);
-               Assert.assertTrue(ti instanceof PojoTypeInfo);
-               PojoField pf = ((PojoTypeInfo) ti).getPojoFieldAt(0);
-               Assert.assertTrue(pf.getTypeInformation() instanceof 
TupleTypeInfo);
-               Assert.assertEquals(GenericTypeInfo.class, ((TupleTypeInfo) 
pf.getTypeInformation()).getTypeAt(0).getClass());
-       }
-
-       @Test
-       public void testRecursivePojo3() {
-               TypeInformation<?> ti = 
TypeExtractor.createTypeInfo(RecursivePojo3.class);
-               Assert.assertTrue(ti instanceof PojoTypeInfo);
-               PojoField pf = ((PojoTypeInfo) ti).getPojoFieldAt(0);
-               Assert.assertTrue(pf.getTypeInformation() instanceof 
PojoTypeInfo);
-               Assert.assertEquals(GenericTypeInfo.class, ((PojoTypeInfo) 
pf.getTypeInformation()).getPojoFieldAt(0).getTypeInformation().getClass());
-       }
-
-       public static class FooBarPojo {
-               public int foo, bar;
-               public FooBarPojo() {}
-       }
-
-       public static class DuplicateMapper implements MapFunction<FooBarPojo, 
Tuple2<FooBarPojo, FooBarPojo>> {
-               @Override
-               public Tuple2<FooBarPojo, FooBarPojo> map(FooBarPojo value) 
throws Exception {
-                       return null;
-               }
-       }
-
-       @Test
-       public void testDualUseOfPojo() {
-               MapFunction<?, ?> function = new DuplicateMapper();
-               TypeInformation<?> ti = 
TypeExtractor.getMapReturnTypes(function, (TypeInformation) 
TypeExtractor.createTypeInfo(FooBarPojo.class));
-               Assert.assertTrue(ti instanceof TupleTypeInfo);
-               TupleTypeInfo<?> tti = ((TupleTypeInfo) ti);
-               Assert.assertTrue(tti.getTypeAt(0) instanceof PojoTypeInfo);
-               Assert.assertTrue(tti.getTypeAt(1) instanceof PojoTypeInfo);
-       }
-}

Reply via email to