http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java
deleted file mode 100644
index de24956..0000000
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java
+++ /dev/null
@@ -1,592 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.typeutils.runtime;
-
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-import java.lang.reflect.Field;
-import java.lang.reflect.Modifier;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.LinkedHashMap;
-import java.util.LinkedHashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-
-import com.google.common.base.Preconditions;
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-
-
-public final class PojoSerializer<T> extends TypeSerializer<T> {
-
-       // Flags for the header
-       private static byte IS_NULL = 1;
-       private static byte NO_SUBCLASS = 2;
-       private static byte IS_SUBCLASS = 4;
-       private static byte IS_TAGGED_SUBCLASS = 8;
-
-       private static final long serialVersionUID = 1L;
-
-       private final Class<T> clazz;
-
-       private final TypeSerializer<Object>[] fieldSerializers;
-
-       private final int numFields;
-
-       private final Map<Class<?>, Integer> registeredClasses;
-
-       private final TypeSerializer<?>[] registeredSerializers;
-
-       private final ExecutionConfig executionConfig;
-
-       private transient Map<Class<?>, TypeSerializer<?>> 
subclassSerializerCache;
-       private transient ClassLoader cl;
-       // We need to handle these ourselves in writeObject()/readObject()
-       private transient Field[] fields;
-
-       @SuppressWarnings("unchecked")
-       public PojoSerializer(
-                       Class<T> clazz,
-                       TypeSerializer<?>[] fieldSerializers,
-                       Field[] fields,
-                       ExecutionConfig executionConfig) {
-
-               this.clazz = Preconditions.checkNotNull(clazz);
-               this.fieldSerializers = (TypeSerializer<Object>[]) 
Preconditions.checkNotNull(fieldSerializers);
-               this.fields = Preconditions.checkNotNull(fields);
-               this.numFields = fieldSerializers.length;
-               this.executionConfig = 
Preconditions.checkNotNull(executionConfig);
-
-               LinkedHashSet<Class<?>> registeredPojoTypes = 
executionConfig.getRegisteredPojoTypes();
-
-               for (int i = 0; i < numFields; i++) {
-                       this.fields[i].setAccessible(true);
-               }
-
-               cl = Thread.currentThread().getContextClassLoader();
-
-               subclassSerializerCache = new HashMap<Class<?>, 
TypeSerializer<?>>();
-
-               // We only want those classes that are not our own class and 
are actually sub-classes.
-               List<Class<?>> cleanedTaggedClasses = new 
ArrayList<Class<?>>(registeredPojoTypes.size());
-               for (Class<?> registeredClass: registeredPojoTypes) {
-                       if (registeredClass.equals(clazz)) {
-                               continue;
-                       }
-                       if (!clazz.isAssignableFrom(registeredClass)) {
-                               continue;
-                       }
-                       cleanedTaggedClasses.add(registeredClass);
-
-               }
-               this.registeredClasses = new LinkedHashMap<Class<?>, 
Integer>(cleanedTaggedClasses.size());
-               registeredSerializers = new 
TypeSerializer[cleanedTaggedClasses.size()];
-
-               int id = 0;
-               for (Class<?> registeredClass: cleanedTaggedClasses) {
-                       this.registeredClasses.put(registeredClass, id);
-                       TypeInformation<?> typeInfo = 
TypeExtractor.createTypeInfo(registeredClass);
-                       registeredSerializers[id] = 
typeInfo.createSerializer(executionConfig);
-
-                       id++;
-               }
-       }
-
-       private void writeObject(ObjectOutputStream out)
-                       throws IOException, ClassNotFoundException {
-               out.defaultWriteObject();
-               out.writeInt(fields.length);
-               for (Field field: fields) {
-                       out.writeObject(field.getDeclaringClass());
-                       out.writeUTF(field.getName());
-               }
-       }
-
-       private void readObject(ObjectInputStream in)
-                       throws IOException, ClassNotFoundException {
-               in.defaultReadObject();
-               int numFields = in.readInt();
-               fields = new Field[numFields];
-               for (int i = 0; i < numFields; i++) {
-                       Class<?> clazz = (Class<?>)in.readObject();
-                       String fieldName = in.readUTF();
-                       fields[i] = null;
-                       // try superclasses as well
-                       while (clazz != null) {
-                               try {
-                                       fields[i] = 
clazz.getDeclaredField(fieldName);
-                                       fields[i].setAccessible(true);
-                                       break;
-                               } catch (NoSuchFieldException e) {
-                                       clazz = clazz.getSuperclass();
-                               }
-                       }
-                       if (fields[i] == null) {
-                               throw new RuntimeException("Class resolved at 
TaskManager is not compatible with class read during Plan setup."
-                                               + " (" + fieldName + ")");
-                       }
-               }
-
-               cl = Thread.currentThread().getContextClassLoader();
-               subclassSerializerCache = new HashMap<Class<?>, 
TypeSerializer<?>>();
-       }
-
-       private TypeSerializer<?> getSubclassSerializer(Class<?> subclass) {
-               TypeSerializer<?> result = 
subclassSerializerCache.get(subclass);
-               if (result == null) {
-
-                       TypeInformation<?> typeInfo = 
TypeExtractor.createTypeInfo(subclass);
-                       result = typeInfo.createSerializer(executionConfig);
-                       if (result instanceof PojoSerializer) {
-                               PojoSerializer<?> subclassSerializer = 
(PojoSerializer<?>) result;
-                               subclassSerializer.copyBaseFieldOrder(this);
-                       }
-                       subclassSerializerCache.put(subclass, result);
-
-               }
-               return result;
-       }
-
-       @SuppressWarnings("unused")
-       private boolean hasField(Field f) {
-               for (Field field: fields) {
-                       if (f.equals(field)) {
-                               return true;
-                       }
-               }
-               return false;
-       }
-
-       private void copyBaseFieldOrder(PojoSerializer<?> baseSerializer) {
-               // do nothing for now, but in the future, adapt subclass 
serializer to have same
-               // ordering as base class serializer so that binary comparison 
on base class fields
-               // can work
-       }
-       
-       @Override
-       public boolean isImmutableType() {
-               return false;
-       }
-
-       @Override
-       public PojoSerializer<T> duplicate() {
-               boolean stateful = false;
-               TypeSerializer<?>[] duplicateFieldSerializers = new 
TypeSerializer[fieldSerializers.length];
-
-               for (int i = 0; i < fieldSerializers.length; i++) {
-                       duplicateFieldSerializers[i] = 
fieldSerializers[i].duplicate();
-                       if (duplicateFieldSerializers[i] != 
fieldSerializers[i]) {
-                               // at least one of them is stateful
-                               stateful = true;
-                       }
-               }
-
-               if (stateful) {
-                       return new PojoSerializer<T>(clazz, 
duplicateFieldSerializers, fields, executionConfig);
-               } else {
-                       return this;
-               }
-       }
-
-       
-       @Override
-       public T createInstance() {
-               if (clazz.isInterface() || 
Modifier.isAbstract(clazz.getModifiers())) {
-                       return null;
-               }
-               try {
-                       T t = clazz.newInstance();
-                       initializeFields(t);
-                       return t;
-               }
-               catch (Exception e) {
-                       throw new RuntimeException("Cannot instantiate class.", 
e);
-               }
-       }
-
-       protected void initializeFields(T t) {
-               for (int i = 0; i < numFields; i++) {
-                       try {
-                               fields[i].set(t, 
fieldSerializers[i].createInstance());
-                       } catch (IllegalAccessException e) {
-                               throw new RuntimeException("Cannot initialize 
fields.", e);
-                       }
-               }
-       }
-
-       @Override
-       @SuppressWarnings({"unchecked", "rawtypes"})
-       public T copy(T from) {
-               if (from == null) {
-                       return null;
-               }
-
-               Class<?> actualType = from.getClass();
-               if (actualType == clazz) {
-                       T target;
-                       try {
-                               target = (T) from.getClass().newInstance();
-                       }
-                       catch (Throwable t) {
-                               throw new RuntimeException("Cannot instantiate 
class.", t);
-                       }
-                       // no subclass
-                       try {
-                               for (int i = 0; i < numFields; i++) {
-                                       Object value = fields[i].get(from);
-                                       if (value != null) {
-                                               Object copy = 
fieldSerializers[i].copy(value);
-                                               fields[i].set(target, copy);
-                                       }
-                                       else {
-                                               fields[i].set(target, null);
-                                       }
-                               }
-                       } catch (IllegalAccessException e) {
-                               throw new RuntimeException("Error during POJO 
copy, this should not happen since we check the fields before.");
-
-                       }
-                       return target;
-               } else {
-                       // subclass
-                       TypeSerializer subclassSerializer = 
getSubclassSerializer(actualType);
-                       return (T) subclassSerializer.copy(from);
-               }
-       }
-       
-       @Override
-       @SuppressWarnings({"unchecked", "rawtypes"})
-       public T copy(T from, T reuse) {
-               if (from == null) {
-                       return null;
-               }
-
-               Class<?> actualType = from.getClass();
-               if (reuse == null || actualType != reuse.getClass()) {
-                       // cannot reuse, do a non-reuse copy
-                       return copy(from);
-               }
-
-               if (actualType == clazz) {
-                       try {
-                               for (int i = 0; i < numFields; i++) {
-                                       Object value = fields[i].get(from);
-                                       if (value != null) {
-                                               Object reuseValue = 
fields[i].get(reuse);
-                                               Object copy;
-                                               if(reuseValue != null) {
-                                                       copy = 
fieldSerializers[i].copy(value, reuseValue);
-                                               }
-                                               else {
-                                                       copy = 
fieldSerializers[i].copy(value);
-                                               }
-                                               fields[i].set(reuse, copy);
-                                       }
-                                       else {
-                                               fields[i].set(reuse, null);
-                                       }
-                               }
-                       } catch (IllegalAccessException e) {
-                               throw new RuntimeException("Error during POJO 
copy, this should not happen since we check the fields" + "before.");
-                       }
-               } else {
-                       TypeSerializer subclassSerializer = 
getSubclassSerializer(actualType);
-                       reuse = (T) subclassSerializer.copy(from, reuse);
-               }
-
-               return reuse;
-       }
-
-       @Override
-       public int getLength() {
-               return -1;
-       }
-
-
-       @Override
-       @SuppressWarnings({"unchecked", "rawtypes"})
-       public void serialize(T value, DataOutputView target) throws 
IOException {
-               int flags = 0;
-               // handle null values
-               if (value == null) {
-                       flags |= IS_NULL;
-                       target.writeByte(flags);
-                       return;
-               }
-
-               Integer subclassTag = -1;
-               Class<?> actualClass = value.getClass();
-               TypeSerializer subclassSerializer = null;
-               if (clazz != actualClass) {
-                       subclassTag = registeredClasses.get(actualClass);
-                       if (subclassTag != null) {
-                               flags |= IS_TAGGED_SUBCLASS;
-                               subclassSerializer = 
registeredSerializers[subclassTag];
-                       } else {
-                               flags |= IS_SUBCLASS;
-                               subclassSerializer = 
getSubclassSerializer(actualClass);
-                       }
-               } else {
-                       flags |= NO_SUBCLASS;
-               }
-
-               target.writeByte(flags);
-
-               if ((flags & IS_SUBCLASS) != 0) {
-                       target.writeUTF(actualClass.getName());
-               } else if ((flags & IS_TAGGED_SUBCLASS) != 0) {
-                       target.writeByte(subclassTag);
-               }
-
-
-               if ((flags & NO_SUBCLASS) != 0) {
-                       try {
-                               for (int i = 0; i < numFields; i++) {
-                                       Object o = fields[i].get(value);
-                                       if (o == null) {
-                                               target.writeBoolean(true); // 
null field handling
-                                       } else {
-                                               target.writeBoolean(false);
-                                               
fieldSerializers[i].serialize(o, target);
-                                       }
-                               }
-                       } catch (IllegalAccessException e) {
-                               throw new RuntimeException("Error during POJO 
copy, this should not happen since we check the fields" + "before.");
-
-                       }
-               } else {
-                       // subclass
-                       if (subclassSerializer != null) {
-                               subclassSerializer.serialize(value, target);
-                       }
-               }
-       }
-
-       @Override
-       @SuppressWarnings({"unchecked", "rawtypes"})
-       public T deserialize(DataInputView source) throws IOException {
-               int flags = source.readByte();
-               if((flags & IS_NULL) != 0) {
-                       return null;
-               }
-
-               T target;
-
-               Class<?> actualSubclass = null;
-               TypeSerializer subclassSerializer = null;
-
-               if ((flags & IS_SUBCLASS) != 0) {
-                       String subclassName = source.readUTF();
-                       try {
-                               actualSubclass = Class.forName(subclassName, 
true, cl);
-                       } catch (ClassNotFoundException e) {
-                               throw new RuntimeException("Cannot instantiate 
class.", e);
-                       }
-                       subclassSerializer = 
getSubclassSerializer(actualSubclass);
-                       target = (T) subclassSerializer.createInstance();
-                       // also initialize fields for which the subclass 
serializer is not responsible
-                       initializeFields(target);
-               } else if ((flags & IS_TAGGED_SUBCLASS) != 0) {
-
-                       int subclassTag = source.readByte();
-                       subclassSerializer = registeredSerializers[subclassTag];
-                       target = (T) subclassSerializer.createInstance();
-                       // also initialize fields for which the subclass 
serializer is not responsible
-                       initializeFields(target);
-               } else {
-                       target = createInstance();
-               }
-
-               if ((flags & NO_SUBCLASS) != 0) {
-                       try {
-                               for (int i = 0; i < numFields; i++) {
-                                       boolean isNull = source.readBoolean();
-                                       if (isNull) {
-                                               fields[i].set(target, null);
-                                       } else {
-                                               Object field = 
fieldSerializers[i].deserialize(source);
-                                               fields[i].set(target, field);
-                                       }
-                               }
-                       } catch (IllegalAccessException e) {
-                               throw new RuntimeException("Error during POJO 
copy, this should not happen since we check the fields" + "before.");
-
-                       }
-               } else {
-                       if (subclassSerializer != null) {
-                               target = (T) 
subclassSerializer.deserialize(target, source);
-                       }
-               }
-               return target;
-       }
-       
-       @Override
-       @SuppressWarnings({"unchecked", "rawtypes"})
-       public T deserialize(T reuse, DataInputView source) throws IOException {
-
-               // handle null values
-               int flags = source.readByte();
-               if((flags & IS_NULL) != 0) {
-                       return null;
-               }
-
-               Class<?> subclass = null;
-               TypeSerializer subclassSerializer = null;
-               if ((flags & IS_SUBCLASS) != 0) {
-                       String subclassName = source.readUTF();
-                       try {
-                               subclass = Class.forName(subclassName, true, 
cl);
-                       } catch (ClassNotFoundException e) {
-                               throw new RuntimeException("Cannot instantiate 
class.", e);
-                       }
-                       subclassSerializer = getSubclassSerializer(subclass);
-
-                       if (reuse == null || subclass != reuse.getClass()) {
-                               // cannot reuse
-                               reuse = (T) subclassSerializer.createInstance();
-                               // also initialize fields for which the 
subclass serializer is not responsible
-                               initializeFields(reuse);
-                       }
-               } else if ((flags & IS_TAGGED_SUBCLASS) != 0) {
-                       int subclassTag = source.readByte();
-                       subclassSerializer = registeredSerializers[subclassTag];
-
-                       if (reuse == null || 
((PojoSerializer)subclassSerializer).clazz != reuse.getClass()) {
-                               // cannot reuse
-                               reuse = (T) subclassSerializer.createInstance();
-                               // also initialize fields for which the 
subclass serializer is not responsible
-                               initializeFields(reuse);
-                       }
-               } else {
-                       if (reuse == null || clazz != reuse.getClass()) {
-                               reuse = createInstance();
-                       }
-               }
-
-               if ((flags & NO_SUBCLASS) != 0) {
-                       try {
-                               for (int i = 0; i < numFields; i++) {
-                                       boolean isNull = source.readBoolean();
-                                       if (isNull) {
-                                               fields[i].set(reuse, null);
-                                       } else {
-                                               Object field;
-
-                                               Object reuseField = 
fields[i].get(reuse);
-                                               if(reuseField != null) {
-                                                       field = 
fieldSerializers[i].deserialize(reuseField, source);
-                                               }
-                                               else {
-                                                       field = 
fieldSerializers[i].deserialize(source);
-                                               }
-
-                                               fields[i].set(reuse, field);
-                                       }
-                               }
-                       } catch (IllegalAccessException e) {
-                               throw new RuntimeException(
-                                               "Error during POJO copy, this 
should not happen since we check the fields before.");
-                       }
-               } else {
-                       if (subclassSerializer != null) {
-                               reuse = (T) 
subclassSerializer.deserialize(reuse, source);
-                       }
-               }
-
-               return reuse;
-       }
-
-       @Override
-       public void copy(DataInputView source, DataOutputView target) throws 
IOException {
-               // copy the flags
-               int flags = source.readByte();
-               target.writeByte(flags);
-
-               if ((flags & IS_NULL) != 0) {
-                       // is a null value, nothing further to copy
-                       return;
-               }
-
-               TypeSerializer<?> subclassSerializer = null;
-               if ((flags & IS_SUBCLASS) != 0) {
-                       String className = source.readUTF();
-                       target.writeUTF(className);
-                       try {
-                               Class<?> subclass = Class.forName(className, 
true, Thread.currentThread()
-                                               .getContextClassLoader());
-                               subclassSerializer = 
getSubclassSerializer(subclass);
-                       } catch (ClassNotFoundException e) {
-                               throw new RuntimeException("Cannot instantiate 
class.", e);
-                       }
-               } else if ((flags & IS_TAGGED_SUBCLASS) != 0) {
-                       int subclassTag = source.readByte();
-                       target.writeByte(subclassTag);
-                       subclassSerializer = registeredSerializers[subclassTag];
-               }
-
-               if ((flags & NO_SUBCLASS) != 0) {
-                       for (int i = 0; i < numFields; i++) {
-                               boolean isNull = source.readBoolean();
-                               target.writeBoolean(isNull);
-                               if (!isNull) {
-                                       fieldSerializers[i].copy(source, 
target);
-                               }
-                       }
-               } else {
-                       if (subclassSerializer != null) {
-                               subclassSerializer.copy(source, target);
-                       }
-               }
-       }
-       
-       @Override
-       public int hashCode() {
-               return 31 * (31 * Arrays.hashCode(fieldSerializers) + 
Arrays.hashCode(registeredSerializers)) +
-                       Objects.hash(clazz, numFields, registeredClasses);
-       }
-       
-       @Override
-       public boolean equals(Object obj) {
-               if (obj instanceof PojoSerializer) {
-                       PojoSerializer<?> other = (PojoSerializer<?>) obj;
-
-                       return other.canEqual(this) &&
-                               clazz == other.clazz &&
-                               Arrays.equals(fieldSerializers, 
other.fieldSerializers) &&
-                               Arrays.equals(registeredSerializers, 
other.registeredSerializers) &&
-                               numFields == other.numFields &&
-                               
registeredClasses.equals(other.registeredClasses);
-               } else {
-                       return false;
-               }
-       }
-
-       @Override
-       public boolean canEqual(Object obj) {
-               return obj instanceof PojoSerializer;
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/RuntimeComparatorFactory.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/RuntimeComparatorFactory.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/RuntimeComparatorFactory.java
deleted file mode 100644
index 4b734a7..0000000
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/RuntimeComparatorFactory.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.api.java.typeutils.runtime;
-
-import org.apache.flink.api.common.typeutils.TypeComparator;
-import org.apache.flink.api.common.typeutils.TypeComparatorFactory;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.util.InstantiationUtil;
-
-public final class RuntimeComparatorFactory<T> implements 
TypeComparatorFactory<T>, java.io.Serializable {
-
-       private static final long serialVersionUID = 1L;
-
-
-       private static final String CONFIG_KEY = "SER_DATA";
-
-       private TypeComparator<T> comparator;
-
-
-       public RuntimeComparatorFactory() {}
-
-       public RuntimeComparatorFactory(TypeComparator<T> comparator) {
-               this.comparator = comparator;
-       }
-
-       @Override
-       public void writeParametersToConfig(Configuration config) {
-               try {
-                       InstantiationUtil.writeObjectToConfig(comparator, 
config, CONFIG_KEY);
-               }
-               catch (Exception e) {
-                       throw new RuntimeException("Could not serialize 
comparator into the configuration.", e);
-               }
-       }
-
-       @SuppressWarnings("unchecked")
-       @Override
-       public void readParametersFromConfig(Configuration config, ClassLoader 
cl) throws ClassNotFoundException {
-               try {
-                       comparator = (TypeComparator<T>) 
InstantiationUtil.readObjectFromConfig(config, CONFIG_KEY, cl);
-               }
-               catch (ClassNotFoundException e) {
-                       throw e;
-               }
-               catch (Exception e) {
-                       throw new RuntimeException("Could not serialize 
serializer into the configuration.", e);
-               }
-       }
-
-       @Override
-       public TypeComparator<T> createComparator() {
-               if (comparator != null) {
-                       return comparator;
-               } else {
-                       throw new RuntimeException("ComparatorFactory has not 
been initialized from configuration.");
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/RuntimePairComparatorFactory.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/RuntimePairComparatorFactory.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/RuntimePairComparatorFactory.java
deleted file mode 100644
index 31e28f7..0000000
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/RuntimePairComparatorFactory.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.typeutils.runtime;
-
-import org.apache.flink.api.common.typeutils.GenericPairComparator;
-import org.apache.flink.api.common.typeutils.TypeComparator;
-import org.apache.flink.api.common.typeutils.TypePairComparator;
-import org.apache.flink.api.common.typeutils.TypePairComparatorFactory;
-
-public final class RuntimePairComparatorFactory<T1, T2>
-               implements TypePairComparatorFactory<T1, T2>, 
java.io.Serializable {
-
-       private static final long serialVersionUID = 1L;
-
-       @Override
-       public TypePairComparator<T1, T2> createComparator12(
-                       TypeComparator<T1> comparator1,
-                       TypeComparator<T2> comparator2) {
-               return new GenericPairComparator<T1, T2>(comparator1, 
comparator2);
-       }
-
-       @Override
-       public TypePairComparator<T2, T1> createComparator21(
-                       TypeComparator<T1> comparator1,
-                       TypeComparator<T2> comparator2) {
-               return new GenericPairComparator<T2, T1>(comparator2, 
comparator1);
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/RuntimeSerializerFactory.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/RuntimeSerializerFactory.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/RuntimeSerializerFactory.java
deleted file mode 100644
index 96aff73..0000000
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/RuntimeSerializerFactory.java
+++ /dev/null
@@ -1,124 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.api.java.typeutils.runtime;
-
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.util.InstantiationUtil;
-
-public final class RuntimeSerializerFactory<T> implements 
TypeSerializerFactory<T>, java.io.Serializable {
-
-       private static final long serialVersionUID = 1L;
-       
-
-       private static final String CONFIG_KEY_SER = "SER_DATA";
-
-       private static final String CONFIG_KEY_CLASS = "CLASS_DATA";
-
-       
-       private TypeSerializer<T> serializer;
-
-       private boolean firstSerializer = true;
-
-       private Class<T> clazz;
-
-       // Because we read the class from the TaskConfig and instantiate 
ourselves
-       public RuntimeSerializerFactory() {}
-
-       public RuntimeSerializerFactory(TypeSerializer<T> serializer, Class<T> 
clazz) {
-               if (serializer == null || clazz == null) {
-                       throw new NullPointerException();
-               }
-
-               this.clazz = clazz;
-               this.serializer = serializer;
-       }
-
-
-       @Override
-       public void writeParametersToConfig(Configuration config) {
-               try {
-                       InstantiationUtil.writeObjectToConfig(clazz, config, 
CONFIG_KEY_CLASS);
-                       InstantiationUtil.writeObjectToConfig(serializer, 
config, CONFIG_KEY_SER);
-               }
-               catch (Exception e) {
-                       throw new RuntimeException("Could not serialize 
serializer into the configuration.", e);
-               }
-       }
-
-       @SuppressWarnings("unchecked")
-       @Override
-       public void readParametersFromConfig(Configuration config, ClassLoader 
cl) throws ClassNotFoundException {
-               if (config == null || cl == null) {
-                       throw new NullPointerException();
-               }
-               
-               try {
-                       this.clazz = (Class<T>) 
InstantiationUtil.readObjectFromConfig(config, CONFIG_KEY_CLASS, cl);
-                       this.serializer = (TypeSerializer<T>)  
InstantiationUtil.readObjectFromConfig(config, CONFIG_KEY_SER, cl);
-                       firstSerializer = true;
-               }
-               catch (ClassNotFoundException e) {
-                       throw e;
-               }
-               catch (Exception e) {
-                       throw new RuntimeException("Could not load deserializer 
from the configuration.", e);
-               }
-       }
-
-       @Override
-       public TypeSerializer<T> getSerializer() {
-               if (this.serializer != null) {
-                       if (firstSerializer) {
-                               firstSerializer = false;
-                               return this.serializer;
-                       } else {
-                               return this.serializer.duplicate();
-                       }
-               } else {
-                       throw new RuntimeException("SerializerFactory has not 
been initialized from configuration.");
-               }
-       }
-
-       @Override
-       public Class<T> getDataType() {
-               return clazz;
-       }
-       
-       // 
--------------------------------------------------------------------------------------------
-       
-       @Override
-       public int hashCode() {
-               return clazz.hashCode() ^ serializer.hashCode();
-       }
-       
-       @Override
-       public boolean equals(Object obj) {
-               if (obj != null && obj instanceof RuntimeSerializerFactory) {
-                       RuntimeSerializerFactory<?> other = 
(RuntimeSerializerFactory<?>) obj;
-                       
-                       return this.clazz == other.clazz &&
-                                       
this.serializer.equals(other.serializer);
-               } else {
-                       return false;
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/Tuple0Serializer.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/Tuple0Serializer.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/Tuple0Serializer.java
deleted file mode 100644
index a06ff1a..0000000
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/Tuple0Serializer.java
+++ /dev/null
@@ -1,121 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding 
copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the "License"); you may not 
use this file except in compliance with the
- * License. You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either 
express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-package org.apache.flink.api.java.typeutils.runtime;
-
-import java.io.IOException;
-
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.tuple.Tuple0;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-
-public class Tuple0Serializer extends TupleSerializer<Tuple0> {
-       
-       private static final long serialVersionUID = 1278813169022975971L;
-
-       public static final Tuple0Serializer INSTANCE = new Tuple0Serializer();
-
-       // 
------------------------------------------------------------------------
-       
-       private Tuple0Serializer() {
-               super(Tuple0.class, new TypeSerializer<?>[0]);
-       }
-
-       // 
------------------------------------------------------------------------
-
-       @Override
-       public Tuple0Serializer duplicate() {
-               return this;
-       }
-
-       @Override
-       public Tuple0 createInstance() {
-               return Tuple0.INSTANCE;
-       }
-
-       @Override
-       public Tuple0 createInstance(Object[] fields) {
-               if (fields == null || fields.length == 0) {
-                       return Tuple0.INSTANCE;
-               }
-
-               throw new UnsupportedOperationException(
-                               "Tuple0 cannot take any data, as it has zero 
fields.");
-       }
-
-       @Override
-       public Tuple0 copy(Tuple0 from) {
-               return from;
-       }
-
-       @Override
-       public Tuple0 copy(Tuple0 from, Tuple0 reuse) {
-               return reuse;
-       }
-
-       @Override
-       public int getLength() {
-               return 1;
-       }
-
-       @Override
-       public void serialize(Tuple0 record, DataOutputView target) throws 
IOException {
-               target.writeByte(42);
-       }
-
-       @Override
-       public Tuple0 deserialize(DataInputView source) throws IOException {
-               source.readByte();
-               return Tuple0.INSTANCE;
-       }
-
-       @Override
-       public Tuple0 deserialize(Tuple0 reuse, DataInputView source) throws 
IOException {
-               source.readByte();
-               return reuse;
-       }
-
-       @Override
-       public void copy(DataInputView source, DataOutputView target) throws 
IOException {
-               target.writeByte(source.readByte());
-       }
-
-       // 
------------------------------------------------------------------------
-
-
-       @Override
-       public int hashCode() {
-               return Tuple0Serializer.class.hashCode();
-       }
-
-       @Override
-       public boolean equals(Object obj) {
-               if (obj instanceof Tuple0Serializer) {
-                       Tuple0Serializer other = (Tuple0Serializer) obj;
-
-                       return other.canEqual(this);
-               } else {
-                       return false;
-               }
-       }
-
-       @Override
-       public boolean canEqual(Object obj) {
-               return obj instanceof Tuple0Serializer;
-       }
-
-       @Override
-       public String toString() {
-               return "Tuple0Serializer";
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleComparator.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleComparator.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleComparator.java
deleted file mode 100644
index 875ecc2..0000000
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleComparator.java
+++ /dev/null
@@ -1,157 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.typeutils.runtime;
-
-import org.apache.flink.api.common.typeutils.TypeComparator;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.core.memory.MemorySegment;
-import org.apache.flink.types.KeyFieldOutOfBoundsException;
-import org.apache.flink.types.NullFieldException;
-import org.apache.flink.types.NullKeyFieldException;
-
-
-public final class TupleComparator<T extends Tuple> extends 
TupleComparatorBase<T> {
-
-       private static final long serialVersionUID = 1L;
-       
-       public TupleComparator(int[] keyPositions, TypeComparator<?>[] 
comparators, TypeSerializer<?>[] serializers) {
-               super(keyPositions, comparators, serializers);
-       }
-       
-       private TupleComparator(TupleComparator<T> toClone) {
-               super(toClone);
-       }
-       
-       // 
--------------------------------------------------------------------------------------------
-       //  Comparator Methods
-       // 
--------------------------------------------------------------------------------------------
-       
-       @SuppressWarnings("unchecked")
-       @Override
-       public int hash(T value) {
-               int i = 0;
-               try {
-                       int code = 
this.comparators[0].hash(value.getFieldNotNull(keyPositions[0]));
-                       for (i = 1; i < this.keyPositions.length; i++) {
-                               code *= HASH_SALT[i & 0x1F]; // salt code with 
(i % HASH_SALT.length)-th salt component
-                               code += 
this.comparators[i].hash(value.getFieldNotNull(keyPositions[i]));
-                       }
-                       return code;
-               }
-               catch (NullFieldException nfex) {
-                       throw new NullKeyFieldException(nfex);
-               }
-               catch (IndexOutOfBoundsException iobex) {
-                       throw new KeyFieldOutOfBoundsException(keyPositions[i]);
-               }
-       }
-
-       @SuppressWarnings("unchecked")
-       @Override
-       public void setReference(T toCompare) {
-               int i = 0;
-               try {
-                       for (; i < this.keyPositions.length; i++) {
-                               
this.comparators[i].setReference(toCompare.getFieldNotNull(this.keyPositions[i]));
-                       }
-               }
-               catch (NullFieldException nfex) {
-                       throw new NullKeyFieldException(nfex);
-               }
-               catch (IndexOutOfBoundsException iobex) {
-                       throw new KeyFieldOutOfBoundsException(keyPositions[i]);
-               }
-       }
-
-       @SuppressWarnings("unchecked")
-       @Override
-       public boolean equalToReference(T candidate) {
-               int i = 0;
-               try {
-                       for (; i < this.keyPositions.length; i++) {
-                               if 
(!this.comparators[i].equalToReference(candidate.getFieldNotNull(this.keyPositions[i])))
 {
-                                       return false;
-                               }
-                       }
-                       return true;
-               }
-               catch (NullFieldException nfex) {
-                       throw new NullKeyFieldException(nfex);
-               }
-               catch (IndexOutOfBoundsException iobex) {
-                       throw new KeyFieldOutOfBoundsException(keyPositions[i]);
-               }
-       }
-       
-       @SuppressWarnings("unchecked")
-       @Override
-       public int compare(T first, T second) {
-               int i = 0;
-               try {
-                       for (; i < keyPositions.length; i++) {
-                               int keyPos = keyPositions[i];
-                               int cmp = 
comparators[i].compare(first.getFieldNotNull(keyPos), 
second.getFieldNotNull(keyPos));
-
-                               if (cmp != 0) {
-                                       return cmp;
-                               }
-                       }
-                       return 0;
-               } 
-               catch (NullFieldException nfex) {
-                       throw new NullKeyFieldException(nfex);
-               }
-               catch (IndexOutOfBoundsException iobex) {
-                       throw new KeyFieldOutOfBoundsException(keyPositions[i]);
-               }
-       }
-
-       @SuppressWarnings("unchecked")
-       @Override
-       public void putNormalizedKey(T value, MemorySegment target, int offset, 
int numBytes) {
-               int i = 0;
-               try {
-                       for (; i < this.numLeadingNormalizableKeys && numBytes 
> 0; i++) {
-                               int len = this.normalizedKeyLengths[i];
-                               len = numBytes >= len ? len : numBytes;
-                               
this.comparators[i].putNormalizedKey(value.getFieldNotNull(this.keyPositions[i]),
 target, offset, len);
-                               numBytes -= len;
-                               offset += len;
-                       }
-               } catch (NullFieldException nfex) {
-                       throw new NullKeyFieldException(nfex);
-               } catch (NullPointerException npex) {
-                       throw new NullKeyFieldException(this.keyPositions[i]);
-               }
-       }
-
-       @Override
-       public int extractKeys(Object record, Object[] target, int index) {
-               int localIndex = index;
-               for(int i = 0; i < comparators.length; i++) {
-                       localIndex += comparators[i].extractKeys(((Tuple) 
record).getField(keyPositions[i]), target, localIndex);
-               }
-               return localIndex - index;
-       }
-
-       public TypeComparator<T> duplicate() {
-               return new TupleComparator<T>(this);
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorBase.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorBase.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorBase.java
deleted file mode 100644
index 28169e5..0000000
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorBase.java
+++ /dev/null
@@ -1,279 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.java.typeutils.runtime;
-
-import java.io.IOException;
-import java.util.List;
-
-import org.apache.flink.api.common.typeutils.CompositeTypeComparator;
-import org.apache.flink.api.common.typeutils.TypeComparator;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.types.KeyFieldOutOfBoundsException;
-import org.apache.flink.types.NullKeyFieldException;
-
-
-public abstract class TupleComparatorBase<T> extends 
CompositeTypeComparator<T> implements java.io.Serializable {
-
-       private static final long serialVersionUID = 1L;
-
-       /** key positions describe which fields are keys in what order */
-       protected int[] keyPositions;
-
-       /** comparators for the key fields, in the same order as the key fields 
*/
-       @SuppressWarnings("rawtypes")
-       protected TypeComparator[] comparators;
-
-       protected int[] normalizedKeyLengths;
-
-       protected int numLeadingNormalizableKeys;
-
-       protected int normalizableKeyPrefixLen;
-
-       protected boolean invertNormKey;
-
-
-       /** serializers to deserialize the first n fields for comparison */
-       @SuppressWarnings("rawtypes")
-       protected TypeSerializer[] serializers;
-
-       // cache for the deserialized field objects
-       protected transient Object[] deserializedFields1;
-       protected transient Object[] deserializedFields2;
-
-
-       @SuppressWarnings("unchecked")
-       public TupleComparatorBase(int[] keyPositions, TypeComparator<?>[] 
comparators, TypeSerializer<?>[] serializers) {
-               // set the default utils
-               this.keyPositions = keyPositions;
-               this.comparators = (TypeComparator<Object>[]) comparators;
-               this.serializers = (TypeSerializer<Object>[]) serializers;
-
-               // set up auxiliary fields for normalized key support
-               this.normalizedKeyLengths = new int[keyPositions.length];
-               int nKeys = 0;
-               int nKeyLen = 0;
-               boolean inverted = false;
-
-               for (int i = 0; i < this.keyPositions.length; i++) {
-                       TypeComparator<?> k = this.comparators[i];
-
-                       // as long as the leading keys support normalized keys, 
we can build up the composite key
-                       if (k.supportsNormalizedKey()) {
-                               if (i == 0) {
-                                       // the first comparator decides whether 
we need to invert the key direction
-                                       inverted = k.invertNormalizedKey();
-                               }
-                               else if (k.invertNormalizedKey() != inverted) {
-                                       // if a successor does not agree on the 
inversion direction, it cannot be part of the normalized key
-                                       break;
-                               }
-
-                               nKeys++;
-                               final int len = k.getNormalizeKeyLen();
-                               if (len < 0) {
-                                       throw new RuntimeException("Comparator 
" + k.getClass().getName() + " specifies an invalid length for the normalized 
key: " + len);
-                               }
-                               this.normalizedKeyLengths[i] = len;
-                               nKeyLen += len;
-
-                               if (nKeyLen < 0) {
-                                       // overflow, which means we are out of 
budget for normalized key space anyways
-                                       nKeyLen = Integer.MAX_VALUE;
-                                       break;
-                               }
-                       } else {
-                               break;
-                       }
-               }
-               this.numLeadingNormalizableKeys = nKeys;
-               this.normalizableKeyPrefixLen = nKeyLen;
-               this.invertNormKey = inverted;
-       }
-
-       protected TupleComparatorBase(TupleComparatorBase<T> toClone) {
-               privateDuplicate(toClone);
-       }
-
-       // We need this because we cannot call the cloning constructor from the
-       // ScalaTupleComparator
-       protected void privateDuplicate(TupleComparatorBase<T> toClone) {
-               // copy fields and serializer factories
-               this.keyPositions = toClone.keyPositions;
-
-               this.serializers = new 
TypeSerializer[toClone.serializers.length];
-               for (int i = 0; i < toClone.serializers.length; i++) {
-                       this.serializers[i] = 
toClone.serializers[i].duplicate();
-               }
-
-               this.comparators = new 
TypeComparator[toClone.comparators.length];
-               for (int i = 0; i < toClone.comparators.length; i++) {
-                       this.comparators[i] = 
toClone.comparators[i].duplicate();
-               }
-
-               this.normalizedKeyLengths = toClone.normalizedKeyLengths;
-               this.numLeadingNormalizableKeys = 
toClone.numLeadingNormalizableKeys;
-               this.normalizableKeyPrefixLen = 
toClone.normalizableKeyPrefixLen;
-               this.invertNormKey = toClone.invertNormKey;
-       }
-       
-       // 
--------------------------------------------------------------------------------------------
-       //  Comparator Methods
-       // 
--------------------------------------------------------------------------------------------
-       
-       protected int[] getKeyPositions() {
-               return this.keyPositions;
-       }
-       
-       
-       @SuppressWarnings({ "rawtypes", "unchecked" })
-       @Override
-       public void getFlatComparator(List<TypeComparator> flatComparators) {
-               for(int i = 0; i < comparators.length; i++) {
-                       if(comparators[i] instanceof CompositeTypeComparator) {
-                               
((CompositeTypeComparator)comparators[i]).getFlatComparator(flatComparators);
-                       } else {
-                               flatComparators.add(comparators[i]);
-                       }
-               }
-       }       
-       // 
--------------------------------------------------------------------------------------------
-       //  Comparator Methods
-       // 
--------------------------------------------------------------------------------------------
-
-
-       @Override
-       public int compareToReference(TypeComparator<T> referencedComparator) {
-               TupleComparatorBase<T> other = (TupleComparatorBase<T>) 
referencedComparator;
-               
-               int i = 0;
-               try {
-                       for (; i < this.keyPositions.length; i++) {
-                               @SuppressWarnings("unchecked")
-                               int cmp = 
this.comparators[i].compareToReference(other.comparators[i]);
-                               if (cmp != 0) {
-                                       return cmp;
-                               }
-                       }
-                       return 0;
-               }
-               catch (NullPointerException npex) {
-                       throw new NullKeyFieldException(keyPositions[i]);
-               }
-               catch (IndexOutOfBoundsException iobex) {
-                       throw new KeyFieldOutOfBoundsException(keyPositions[i]);
-               }
-       }
-       
-       @SuppressWarnings("unchecked")
-       @Override
-       public int compareSerialized(DataInputView firstSource, DataInputView 
secondSource) throws IOException {
-               if (deserializedFields1 == null) {
-                       instantiateDeserializationUtils();
-               }
-               
-               int i = 0;
-               try {
-                       for (; i < serializers.length; i++) {
-                               deserializedFields1[i] = 
serializers[i].deserialize(deserializedFields1[i], firstSource);
-                               deserializedFields2[i] = 
serializers[i].deserialize(deserializedFields2[i], secondSource);
-                       }
-                       
-                       for (i = 0; i < keyPositions.length; i++) {
-                               int keyPos = keyPositions[i];
-                               int cmp = 
comparators[i].compare(deserializedFields1[keyPos], 
deserializedFields2[keyPos]);
-                               if (cmp != 0) {
-                                       return cmp;
-                               }
-                       }
-                       
-                       return 0;
-               } catch (NullPointerException npex) {
-                       throw new NullKeyFieldException(keyPositions[i]);
-               } catch (IndexOutOfBoundsException iobex) {
-                       throw new KeyFieldOutOfBoundsException(keyPositions[i], 
iobex);
-               }
-       }
-       
-       @Override
-       public boolean supportsNormalizedKey() {
-               return this.numLeadingNormalizableKeys > 0;
-       }
-
-       @Override
-       public int getNormalizeKeyLen() {
-               return this.normalizableKeyPrefixLen;
-       }
-
-       @Override
-       public boolean isNormalizedKeyPrefixOnly(int keyBytes) {
-               return this.numLeadingNormalizableKeys < 
this.keyPositions.length ||
-                               this.normalizableKeyPrefixLen == 
Integer.MAX_VALUE ||
-                               this.normalizableKeyPrefixLen > keyBytes;
-       }
-
-       @Override
-       public boolean invertNormalizedKey() {
-               return this.invertNormKey;
-       }
-       
-       
-       @Override
-       public boolean supportsSerializationWithKeyNormalization() {
-               return false;
-       }
-       
-       @Override
-       public void writeWithKeyNormalization(T record, DataOutputView target) 
throws IOException {
-               throw new UnsupportedOperationException();
-       }
-
-       @Override
-       public T readWithKeyDenormalization(T reuse, DataInputView source) 
throws IOException {
-               throw new UnsupportedOperationException();
-       }
-       
-       // 
--------------------------------------------------------------------------------------------
-       
-       protected final void instantiateDeserializationUtils() {
-               this.deserializedFields1 = new Object[this.serializers.length];
-               this.deserializedFields2 = new Object[this.serializers.length];
-               
-               for (int i = 0; i < this.serializers.length; i++) {
-                       this.deserializedFields1[i] = 
this.serializers[i].createInstance();
-                       this.deserializedFields2[i] = 
this.serializers[i].createInstance();
-               }
-       }
-       
-       // 
--------------------------------------------------------------------------------------------
-       
-       /**
-        * A sequence of prime numbers to be used for salting the computed hash 
values.
-        * Based on some empirical evidence, we are using a 32-element 
subsequence of the  
-        * OEIS sequence #A068652 (numbers such that every cyclic permutation 
is a prime).
-        * 
-        * @see <a 
href="http://en.wikipedia.org/wiki/List_of_prime_numbers";>http://en.wikipedia.org/wiki/List_of_prime_numbers</a>
-        * @see <a href="http://oeis.org/A068652";>http://oeis.org/A068652</a>
-        */
-       public static final int[] HASH_SALT = new int[] {
-               73   , 79   , 97   , 113  , 131  , 197  , 199  , 311   , 
-               337  , 373  , 719  , 733  , 919  , 971  , 991  , 1193  , 
-               1931 , 3119 , 3779 , 7793 , 7937 , 9311 , 9377 , 11939 , 
-               19391, 19937, 37199, 39119, 71993, 91193, 93719, 93911 };
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializer.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializer.java
deleted file mode 100644
index 0897063..0000000
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializer.java
+++ /dev/null
@@ -1,158 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.typeutils.runtime;
-
-import java.io.IOException;
-
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.types.NullFieldException;
-
-
-public class TupleSerializer<T extends Tuple> extends TupleSerializerBase<T> {
-
-       private static final long serialVersionUID = 1L;
-       
-       public TupleSerializer(Class<T> tupleClass, TypeSerializer<?>[] 
fieldSerializers) {
-               super(tupleClass, fieldSerializers);
-       }
-
-       @Override
-       public TupleSerializer<T> duplicate() {
-               boolean stateful = false;
-               TypeSerializer<?>[] duplicateFieldSerializers = new 
TypeSerializer<?>[fieldSerializers.length];
-
-               for (int i = 0; i < fieldSerializers.length; i++) {
-                       duplicateFieldSerializers[i] = 
fieldSerializers[i].duplicate();
-                       if (duplicateFieldSerializers[i] != 
fieldSerializers[i]) {
-                               // at least one of them is stateful
-                               stateful = true;
-                       }
-               }
-
-               if (stateful) {
-                       return new TupleSerializer<T>(tupleClass, 
duplicateFieldSerializers);
-               } else {
-                       return this;
-               }
-       }
-
-       @Override
-       public T createInstance() {
-               try {
-                       T t = tupleClass.newInstance();
-               
-                       for (int i = 0; i < arity; i++) {
-                               
t.setField(fieldSerializers[i].createInstance(), i);
-                       }
-                       
-                       return t;
-               }
-               catch (Exception e) {
-                       throw new RuntimeException("Cannot instantiate tuple.", 
e);
-               }
-       }
-
-       @Override
-       public T createInstance(Object[] fields) {
-
-               try {
-                       T t = tupleClass.newInstance();
-
-                       for (int i = 0; i < arity; i++) {
-                               t.setField(fields[i], i);
-                       }
-
-                       return t;
-               }
-               catch (Exception e) {
-                       throw new RuntimeException("Cannot instantiate tuple.", 
e);
-               }
-       }
-
-       @Override
-       public T createOrReuseInstance(Object[] fields, T reuse) {
-               for (int i = 0; i < arity; i++) {
-                       reuse.setField(fields[i], i);
-               }
-               return reuse;
-       }
-
-       @Override
-       public T copy(T from) {
-               T target = instantiateRaw();
-               for (int i = 0; i < arity; i++) {
-                       Object copy = 
fieldSerializers[i].copy(from.getField(i));
-                       target.setField(copy, i);
-               }
-               return target;
-       }
-       
-       @Override
-       public T copy(T from, T reuse) {
-               for (int i = 0; i < arity; i++) {
-                       Object copy = 
fieldSerializers[i].copy(from.getField(i), reuse.getField(i));
-                       reuse.setField(copy, i);
-               }
-               
-               return reuse;
-       }
-
-       @Override
-       public void serialize(T value, DataOutputView target) throws 
IOException {
-               for (int i = 0; i < arity; i++) {
-                       Object o = value.getField(i);
-                       try {
-                               fieldSerializers[i].serialize(o, target);
-                       } catch (NullPointerException npex) {
-                               throw new NullFieldException(i, npex);
-                       }
-               }
-       }
-
-       @Override
-       public T deserialize(DataInputView source) throws IOException {
-               T tuple = instantiateRaw();
-               for (int i = 0; i < arity; i++) {
-                       Object field = fieldSerializers[i].deserialize(source);
-                       tuple.setField(field, i);
-               }
-               return tuple;
-       }
-       
-       @Override
-       public T deserialize(T reuse, DataInputView source) throws IOException {
-               for (int i = 0; i < arity; i++) {
-                       Object field = 
fieldSerializers[i].deserialize(reuse.getField(i), source);
-                       reuse.setField(field, i);
-               }
-               return reuse;
-       }
-       
-       private T instantiateRaw() {
-               try {
-                       return tupleClass.newInstance();
-               }
-               catch (Exception e) {
-                       throw new RuntimeException("Cannot instantiate tuple.", 
e);
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerBase.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerBase.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerBase.java
deleted file mode 100644
index fc657a1..0000000
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerBase.java
+++ /dev/null
@@ -1,102 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.typeutils.runtime;
-
-import com.google.common.base.Preconditions;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.Objects;
-
-
-public abstract class TupleSerializerBase<T> extends TypeSerializer<T> {
-
-       private static final long serialVersionUID = 1L;
-
-       protected final Class<T> tupleClass;
-
-       protected final TypeSerializer<Object>[] fieldSerializers;
-
-       protected final int arity;
-
-       @SuppressWarnings("unchecked")
-       public TupleSerializerBase(Class<T> tupleClass, TypeSerializer<?>[] 
fieldSerializers) {
-               this.tupleClass = Preconditions.checkNotNull(tupleClass);
-               this.fieldSerializers = (TypeSerializer<Object>[]) 
Preconditions.checkNotNull(fieldSerializers);
-               this.arity = fieldSerializers.length;
-       }
-       
-       public Class<T> getTupleClass() {
-               return this.tupleClass;
-       }
-       
-       @Override
-       public boolean isImmutableType() {
-               return false;
-       }
-
-       @Override
-       public int getLength() {
-               return -1;
-       }
-
-       public int getArity() {
-               return arity;
-       }
-
-       // We use this in the Aggregate and Distinct Operators to create 
instances
-       // of immutable Typles (i.e. Scala Tuples)
-       public abstract T createInstance(Object[] fields);
-
-       public abstract T createOrReuseInstance(Object[] fields, T reuse);
-
-       @Override
-       public void copy(DataInputView source, DataOutputView target) throws 
IOException {
-               for (int i = 0; i < arity; i++) {
-                       fieldSerializers[i].copy(source, target);
-               }
-       }
-       
-       @Override
-       public int hashCode() {
-               return 31 * Arrays.hashCode(fieldSerializers) + 
Objects.hash(tupleClass, arity);
-       }
-       
-       @Override
-       public boolean equals(Object obj) {
-               if (obj instanceof TupleSerializerBase) {
-                       TupleSerializerBase<?> other = (TupleSerializerBase<?>) 
obj;
-
-                       return other.canEqual(this) &&
-                               tupleClass == other.tupleClass &&
-                               Arrays.equals(fieldSerializers, 
other.fieldSerializers) &&
-                               arity == other.arity;
-               } else {
-                       return false;
-               }
-       }
-
-       @Override
-       public boolean canEqual(Object obj) {
-               return obj instanceof TupleSerializerBase;
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/ValueComparator.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/ValueComparator.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/ValueComparator.java
deleted file mode 100644
index 4b9629a..0000000
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/ValueComparator.java
+++ /dev/null
@@ -1,183 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.typeutils.runtime;
-
-import java.io.IOException;
-
-import org.apache.flink.api.common.typeutils.TypeComparator;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.core.memory.MemorySegment;
-import org.apache.flink.types.NormalizableKey;
-import org.apache.flink.types.Value;
-import org.apache.flink.util.InstantiationUtil;
-
-import com.esotericsoftware.kryo.Kryo;
-import org.objenesis.strategy.StdInstantiatorStrategy;
-
-/**
- * Comparator for all Value types that extend Key
- */
-public class ValueComparator<T extends Value & Comparable<T>> extends 
TypeComparator<T> {
-       
-       private static final long serialVersionUID = 1L;
-       
-       private final Class<T> type;
-       
-       private final boolean ascendingComparison;
-       
-       private transient T reference;
-       
-       private transient T tempReference;
-       
-       private transient Kryo kryo;
-
-       @SuppressWarnings("rawtypes")
-       private final TypeComparator[] comparators = new TypeComparator[] 
{this};
-
-       public ValueComparator(boolean ascending, Class<T> type) {
-               this.type = type;
-               this.ascendingComparison = ascending;
-       }
-       
-       @Override
-       public int hash(T record) {
-               return record.hashCode();
-       }
-
-       @Override
-       public void setReference(T toCompare) {
-               checkKryoInitialized();
-
-               reference = KryoUtils.copy(toCompare, kryo, new 
ValueSerializer<T>(type));
-       }
-
-       @Override
-       public boolean equalToReference(T candidate) {
-               return candidate.equals(this.reference);
-       }
-
-       @Override
-       public int compareToReference(TypeComparator<T> referencedComparator) {
-               T otherRef = ((ValueComparator<T>) 
referencedComparator).reference;
-               int comp = otherRef.compareTo(reference);
-               return ascendingComparison ? comp : -comp;
-       }
-       
-       @Override
-       public int compare(T first, T second) {
-               int comp = first.compareTo(second);
-               return ascendingComparison ? comp : -comp;
-       }
-       
-       @Override
-       public int compareSerialized(DataInputView firstSource, DataInputView 
secondSource) throws IOException {
-               if (reference == null) {
-                       reference = InstantiationUtil.instantiate(type, 
Value.class);
-               }
-               if (tempReference == null) {
-                       tempReference = InstantiationUtil.instantiate(type, 
Value.class);
-               }
-               
-               reference.read(firstSource);
-               tempReference.read(secondSource);
-               int comp = reference.compareTo(tempReference);
-               return ascendingComparison ? comp : -comp;
-       }
-
-       @Override
-       public boolean supportsNormalizedKey() {
-               return NormalizableKey.class.isAssignableFrom(type);
-       }
-
-       @Override
-       public int getNormalizeKeyLen() {
-               if (reference == null) {
-                       reference = InstantiationUtil.instantiate(type, 
Value.class);
-               }
-               
-               NormalizableKey<?> key = (NormalizableKey<?>) reference;
-               return key.getMaxNormalizedKeyLen();
-       }
-
-       @Override
-       public boolean isNormalizedKeyPrefixOnly(int keyBytes) {
-               return keyBytes < getNormalizeKeyLen();
-       }
-
-       @Override
-       public void putNormalizedKey(T record, MemorySegment target, int 
offset, int numBytes) {
-               NormalizableKey<?> key = (NormalizableKey<?>) record;
-               key.copyNormalizedKey(target, offset, numBytes);
-       }
-
-       @Override
-       public boolean invertNormalizedKey() {
-               return !ascendingComparison;
-       }
-       
-       @Override
-       public TypeComparator<T> duplicate() {
-               return new ValueComparator<T>(ascendingComparison, type);
-       }
-       
-       private void checkKryoInitialized() {
-               if (this.kryo == null) {
-                       this.kryo = new Kryo();
-
-                       Kryo.DefaultInstantiatorStrategy instantiatorStrategy = 
new Kryo.DefaultInstantiatorStrategy();
-                       
instantiatorStrategy.setFallbackInstantiatorStrategy(new 
StdInstantiatorStrategy());
-                       kryo.setInstantiatorStrategy(instantiatorStrategy);
-
-                       this.kryo.setAsmEnabled(true);
-                       this.kryo.register(type);
-               }
-       }
-
-       @Override
-       public int extractKeys(Object record, Object[] target, int index) {
-               target[index] = record;
-               return 1;
-       }
-
-       @SuppressWarnings("rawtypes")
-       @Override
-       public TypeComparator[] getFlatComparators() {
-               return comparators;
-       }
-       
-       // 
--------------------------------------------------------------------------------------------
-       // unsupported normalization
-       // 
--------------------------------------------------------------------------------------------
-       
-       @Override
-       public boolean supportsSerializationWithKeyNormalization() {
-               return false;
-       }
-
-       @Override
-       public void writeWithKeyNormalization(T record, DataOutputView target) 
throws IOException {
-               throw new UnsupportedOperationException();
-       }
-
-       @Override
-       public T readWithKeyDenormalization(T reuse, DataInputView source) 
throws IOException {
-               throw new UnsupportedOperationException();
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/ValueSerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/ValueSerializer.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/ValueSerializer.java
deleted file mode 100644
index 9329866..0000000
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/ValueSerializer.java
+++ /dev/null
@@ -1,152 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.typeutils.runtime;
-
-import java.io.IOException;
-
-import com.google.common.base.Preconditions;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.types.Value;
-import org.apache.flink.util.InstantiationUtil;
-
-import com.esotericsoftware.kryo.Kryo;
-import org.objenesis.strategy.StdInstantiatorStrategy;
-
-/**
- * Serializer for {@link Value} types. Uses the value's serialization methods, 
and uses
- * Kryo for deep object copies.
- *
- * @param <T> The type serialized.
- */
-public class ValueSerializer<T extends Value> extends TypeSerializer<T> {
-
-       private static final long serialVersionUID = 1L;
-       
-       private final Class<T> type;
-       
-       private transient Kryo kryo;
-       
-       private transient T copyInstance;
-       
-       // 
--------------------------------------------------------------------------------------------
-       
-       public ValueSerializer(Class<T> type) {
-               this.type = Preconditions.checkNotNull(type);
-       }
-
-       // 
--------------------------------------------------------------------------------------------
-       
-       @Override
-       public boolean isImmutableType() {
-               return false;
-       }
-
-       @Override
-       public ValueSerializer<T> duplicate() {
-               return new ValueSerializer<T>(type);
-       }
-       
-       @Override
-       public T createInstance() {
-               return InstantiationUtil.instantiate(this.type);
-       }
-
-       @Override
-       public T copy(T from) {
-               checkKryoInitialized();
-
-               return KryoUtils.copy(from, kryo, this);
-       }
-       
-       @Override
-       public T copy(T from, T reuse) {
-               checkKryoInitialized();
-
-               return KryoUtils.copy(from, reuse, kryo, this);
-       }
-
-       @Override
-       public int getLength() {
-               return -1;
-       }
-
-       @Override
-       public void serialize(T value, DataOutputView target) throws 
IOException {
-               value.write(target);
-       }
-
-       @Override
-       public T deserialize(DataInputView source) throws IOException {
-               return deserialize(createInstance(), source);
-       }
-       
-       @Override
-       public T deserialize(T reuse, DataInputView source) throws IOException {
-               reuse.read(source);
-               return reuse;
-       }
-
-       @Override
-       public void copy(DataInputView source, DataOutputView target) throws 
IOException {
-               if (this.copyInstance == null) {
-                       this.copyInstance = InstantiationUtil.instantiate(type);
-               }
-               
-               this.copyInstance.read(source);
-               this.copyInstance.write(target);
-       }
-       
-       private void checkKryoInitialized() {
-               if (this.kryo == null) {
-                       this.kryo = new Kryo();
-
-                       Kryo.DefaultInstantiatorStrategy instantiatorStrategy = 
new Kryo.DefaultInstantiatorStrategy();
-                       
instantiatorStrategy.setFallbackInstantiatorStrategy(new 
StdInstantiatorStrategy());
-                       kryo.setInstantiatorStrategy(instantiatorStrategy);
-
-                       this.kryo.setAsmEnabled(true);
-                       this.kryo.register(type);
-               }
-       }
-       
-       // 
--------------------------------------------------------------------------------------------
-       
-       @Override
-       public int hashCode() {
-               return this.type.hashCode();
-       }
-       
-       @Override
-       public boolean equals(Object obj) {
-               if (obj instanceof ValueSerializer) {
-                       ValueSerializer<?> other = (ValueSerializer<?>) obj;
-
-                       return other.canEqual(this) && type == other.type;
-               } else {
-                       return false;
-               }
-       }
-
-       @Override
-       public boolean canEqual(Object obj) {
-               return obj instanceof ValueSerializer;
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableComparator.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableComparator.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableComparator.java
deleted file mode 100644
index a03369a..0000000
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableComparator.java
+++ /dev/null
@@ -1,189 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.typeutils.runtime;
-
-import java.io.IOException;
-
-import org.apache.flink.api.common.typeutils.TypeComparator;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.core.memory.MemorySegment;
-import org.apache.flink.types.NormalizableKey;
-import org.apache.flink.util.InstantiationUtil;
-import org.apache.hadoop.io.Writable;
-
-import com.esotericsoftware.kryo.Kryo;
-import org.objenesis.strategy.StdInstantiatorStrategy;
-
-public class WritableComparator<T extends Writable & Comparable<T>> extends 
TypeComparator<T> {
-       
-       private static final long serialVersionUID = 1L;
-       
-       private Class<T> type;
-       
-       private final boolean ascendingComparison;
-       
-       private transient T reference;
-       
-       private transient T tempReference;
-       
-       private transient Kryo kryo;
-
-       @SuppressWarnings("rawtypes")
-       private final TypeComparator[] comparators = new TypeComparator[] 
{this};
-
-       public WritableComparator(boolean ascending, Class<T> type) {
-               this.type = type;
-               this.ascendingComparison = ascending;
-       }
-       
-       @Override
-       public int hash(T record) {
-               return record.hashCode();
-       }
-       
-       @Override
-       public void setReference(T toCompare) {
-               checkKryoInitialized();
-
-               reference = KryoUtils.copy(toCompare, kryo, new 
WritableSerializer<T>(type));
-       }
-       
-       @Override
-       public boolean equalToReference(T candidate) {
-               return candidate.equals(reference);
-       }
-       
-       @Override
-       public int compareToReference(TypeComparator<T> referencedComparator) {
-               T otherRef = ((WritableComparator<T>) 
referencedComparator).reference;
-               int comp = otherRef.compareTo(reference);
-               return ascendingComparison ? comp : -comp;
-       }
-       
-       @Override
-       public int compare(T first, T second) {
-               int comp = first.compareTo(second);
-               return ascendingComparison ? comp : -comp;
-       }
-       
-       @Override
-       public int compareSerialized(DataInputView firstSource, DataInputView 
secondSource) throws IOException {
-               ensureReferenceInstantiated();
-               ensureTempReferenceInstantiated();
-               
-               reference.readFields(firstSource);
-               tempReference.readFields(secondSource);
-               
-               int comp = reference.compareTo(tempReference);
-               return ascendingComparison ? comp : -comp;
-       }
-       
-       @Override
-       public boolean supportsNormalizedKey() {
-               return NormalizableKey.class.isAssignableFrom(type);
-       }
-       
-       @Override
-       public int getNormalizeKeyLen() {
-               ensureReferenceInstantiated();
-               
-               NormalizableKey<?> key = (NormalizableKey<?>) reference;
-               return key.getMaxNormalizedKeyLen();
-       }
-       
-       @Override
-       public boolean isNormalizedKeyPrefixOnly(int keyBytes) {
-               return keyBytes < getNormalizeKeyLen();
-       }
-       
-       @Override
-       public void putNormalizedKey(T record, MemorySegment target, int 
offset, int numBytes) {
-               NormalizableKey<?> key = (NormalizableKey<?>) record;
-               key.copyNormalizedKey(target, offset, numBytes);
-       }
-       
-       @Override
-       public boolean invertNormalizedKey() {
-               return !ascendingComparison;
-       }
-       
-       @Override
-       public TypeComparator<T> duplicate() {
-               return new WritableComparator<T>(ascendingComparison, type);
-       }
-
-       @Override
-       public int extractKeys(Object record, Object[] target, int index) {
-               target[index] = record;
-               return 1;
-       }
-
-       @SuppressWarnings("rawtypes")
-       @Override
-       public TypeComparator[] getFlatComparators() {
-               return comparators;
-       }
-       
-       // 
--------------------------------------------------------------------------------------------
-       // unsupported normalization
-       // 
--------------------------------------------------------------------------------------------
-       
-       @Override
-       public boolean supportsSerializationWithKeyNormalization() {
-               return false;
-       }
-       
-       @Override
-       public void writeWithKeyNormalization(T record, DataOutputView target) 
throws IOException {
-               throw new UnsupportedOperationException();
-       }
-       
-       @Override
-       public T readWithKeyDenormalization(T reuse, DataInputView source) 
throws IOException {
-               throw new UnsupportedOperationException();
-       }
-       
-       // 
--------------------------------------------------------------------------------------------
-       
-       private void checkKryoInitialized() {
-               if (this.kryo == null) {
-                       this.kryo = new Kryo();
-
-                       Kryo.DefaultInstantiatorStrategy instantiatorStrategy = 
new Kryo.DefaultInstantiatorStrategy();
-                       
instantiatorStrategy.setFallbackInstantiatorStrategy(new 
StdInstantiatorStrategy());
-                       kryo.setInstantiatorStrategy(instantiatorStrategy);
-
-                       this.kryo.setAsmEnabled(true);
-                       this.kryo.register(type);
-               }
-       }
-       
-       private void ensureReferenceInstantiated() {
-               if (reference == null) {
-                       reference = InstantiationUtil.instantiate(type, 
Writable.class);
-               }
-       }
-       
-       private void ensureTempReferenceInstantiated() {
-               if (tempReference == null) {
-                       tempReference = InstantiationUtil.instantiate(type, 
Writable.class);
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializer.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializer.java
deleted file mode 100644
index 258d92c..0000000
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializer.java
+++ /dev/null
@@ -1,153 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.typeutils.runtime;
-
-
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.util.InstantiationUtil;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.Writable;
-
-import com.esotericsoftware.kryo.Kryo;
-import org.objenesis.strategy.StdInstantiatorStrategy;
-
-import java.io.IOException;
-
-public class WritableSerializer<T extends Writable> extends TypeSerializer<T> {
-       
-       private static final long serialVersionUID = 1L;
-       
-       private final Class<T> typeClass;
-       
-       private transient Kryo kryo;
-       
-       private transient T copyInstance;
-       
-       public WritableSerializer(Class<T> typeClass) {
-               this.typeClass = typeClass;
-       }
-       
-       @SuppressWarnings("unchecked")
-       @Override
-       public T createInstance() {
-               if(typeClass == NullWritable.class) {
-                       return (T) NullWritable.get();
-               }
-               return InstantiationUtil.instantiate(typeClass);
-       }
-
-
-       
-       @Override
-       public T copy(T from) {
-               checkKryoInitialized();
-
-               return KryoUtils.copy(from, kryo, this);
-       }
-       
-       @Override
-       public T copy(T from, T reuse) {
-               checkKryoInitialized();
-
-               return KryoUtils.copy(from, reuse, kryo, this);
-       }
-       
-       @Override
-       public int getLength() {
-               return -1;
-       }
-       
-       @Override
-       public void serialize(T record, DataOutputView target) throws 
IOException {
-               record.write(target);
-       }
-       
-       @Override
-       public T deserialize(DataInputView source) throws IOException {
-               return deserialize(createInstance(), source);
-       }
-       
-       @Override
-       public T deserialize(T reuse, DataInputView source) throws IOException {
-               reuse.readFields(source);
-               return reuse;
-       }
-       
-       @Override
-       public void copy(DataInputView source, DataOutputView target) throws 
IOException {
-               ensureInstanceInstantiated();
-               copyInstance.readFields(source);
-               copyInstance.write(target);
-       }
-       
-       @Override
-       public boolean isImmutableType() {
-               return false;
-       }
-       
-       @Override
-       public WritableSerializer<T> duplicate() {
-               return new WritableSerializer<T>(typeClass);
-       }
-       
-       // 
--------------------------------------------------------------------------------------------
-       
-       private void ensureInstanceInstantiated() {
-               if (copyInstance == null) {
-                       copyInstance = createInstance();
-               }
-       }
-       
-       private void checkKryoInitialized() {
-               if (this.kryo == null) {
-                       this.kryo = new Kryo();
-
-                       Kryo.DefaultInstantiatorStrategy instantiatorStrategy = 
new Kryo.DefaultInstantiatorStrategy();
-                       
instantiatorStrategy.setFallbackInstantiatorStrategy(new 
StdInstantiatorStrategy());
-                       kryo.setInstantiatorStrategy(instantiatorStrategy);
-
-                       this.kryo.setAsmEnabled(true);
-                       this.kryo.register(typeClass);
-               }
-       }
-       // 
--------------------------------------------------------------------------------------------
-       
-       @Override
-       public int hashCode() {
-               return this.typeClass.hashCode();
-       }
-       
-       @Override
-       public boolean equals(Object obj) {
-               if (obj instanceof WritableSerializer) {
-                       WritableSerializer<?> other = (WritableSerializer<?>) 
obj;
-
-                       return other.canEqual(this) && typeClass == 
other.typeClass;
-               } else {
-                       return false;
-               }
-       }
-
-       @Override
-       public boolean canEqual(Object obj) {
-               return obj instanceof WritableSerializer;
-       }
-}

Reply via email to