http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoComparator.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoComparator.java
 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoComparator.java
new file mode 100644
index 0000000..c0c7797
--- /dev/null
+++ 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoComparator.java
@@ -0,0 +1,354 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.lang.reflect.Field;
+import java.util.List;
+
+import org.apache.flink.api.common.typeutils.CompositeTypeComparator;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.types.NullKeyFieldException;
+import org.apache.flink.util.InstantiationUtil;
+
+
+public final class PojoComparator<T> extends CompositeTypeComparator<T> 
implements java.io.Serializable {
+       
+       private static final long serialVersionUID = 1L;
+
+       // Reflection fields for the comp fields
+       private transient Field[] keyFields;
+
+       private final TypeComparator<Object>[] comparators;
+
+       private final int[] normalizedKeyLengths;
+
+       private final int numLeadingNormalizableKeys;
+
+       private final int normalizableKeyPrefixLen;
+
+       private final boolean invertNormKey;
+
+       private TypeSerializer<T> serializer;
+
+       private final Class<T> type;
+
+       @SuppressWarnings("unchecked")
+       public PojoComparator(Field[] keyFields, TypeComparator<?>[] 
comparators, TypeSerializer<T> serializer, Class<T> type) {
+               this.keyFields = keyFields;
+               this.comparators = (TypeComparator<Object>[]) comparators;
+
+               this.type = type;
+               this.serializer = serializer;
+
+               // set up auxiliary fields for normalized key support
+               this.normalizedKeyLengths = new int[keyFields.length];
+               int nKeys = 0;
+               int nKeyLen = 0;
+               boolean inverted = false;
+
+               for (int i = 0; i < this.comparators.length; i++) {
+                       TypeComparator<?> k = this.comparators[i];
+                       if(k == null) {
+                               throw new IllegalArgumentException("One of the 
passed comparators is null");
+                       }
+                       if(keyFields[i] == null) {
+                               throw new IllegalArgumentException("One of the 
passed reflection fields is null");
+                       }
+
+                       // as long as the leading keys support normalized keys, 
we can build up the composite key
+                       if (k.supportsNormalizedKey()) {
+                               if (i == 0) {
+                                       // the first comparator decides whether 
we need to invert the key direction
+                                       inverted = k.invertNormalizedKey();
+                               }
+                               else if (k.invertNormalizedKey() != inverted) {
+                                       // if a successor does not agree on the 
invertion direction, it cannot be part of the normalized key
+                                       break;
+                               }
+
+                               nKeys++;
+                               final int len = k.getNormalizeKeyLen();
+                               if (len < 0) {
+                                       throw new RuntimeException("Comparator 
" + k.getClass().getName() + " specifies an invalid length for the normalized 
key: " + len);
+                               }
+                               this.normalizedKeyLengths[i] = len;
+                               nKeyLen += this.normalizedKeyLengths[i];
+
+                               if (nKeyLen < 0) {
+                                       // overflow, which means we are out of 
budget for normalized key space anyways
+                                       nKeyLen = Integer.MAX_VALUE;
+                                       break;
+                               }
+                       } else {
+                               break;
+                       }
+               }
+               this.numLeadingNormalizableKeys = nKeys;
+               this.normalizableKeyPrefixLen = nKeyLen;
+               this.invertNormKey = inverted;
+       }
+
+       @SuppressWarnings("unchecked")
+       private PojoComparator(PojoComparator<T> toClone) {
+               this.keyFields = toClone.keyFields;
+               this.comparators = new 
TypeComparator[toClone.comparators.length];
+
+               for (int i = 0; i < toClone.comparators.length; i++) {
+                       this.comparators[i] = 
toClone.comparators[i].duplicate();
+               }
+
+               this.normalizedKeyLengths = toClone.normalizedKeyLengths;
+               this.numLeadingNormalizableKeys = 
toClone.numLeadingNormalizableKeys;
+               this.normalizableKeyPrefixLen = 
toClone.normalizableKeyPrefixLen;
+               this.invertNormKey = toClone.invertNormKey;
+
+               this.type = toClone.type;
+
+               try {
+                       this.serializer = (TypeSerializer<T>) 
InstantiationUtil.deserializeObject(
+                                       
InstantiationUtil.serializeObject(toClone.serializer), 
Thread.currentThread().getContextClassLoader());
+               } catch (IOException e) {
+                       throw new RuntimeException("Cannot copy serializer", e);
+               } catch (ClassNotFoundException e) {
+                       throw new RuntimeException("Cannot copy serializer", e);
+               }
+       }
+
+       private void writeObject(ObjectOutputStream out)
+                       throws IOException, ClassNotFoundException {
+               out.defaultWriteObject();
+               out.writeInt(keyFields.length);
+               for (Field field: keyFields) {
+                       out.writeObject(field.getDeclaringClass());
+                       out.writeUTF(field.getName());
+               }
+       }
+
+       private void readObject(ObjectInputStream in)
+                       throws IOException, ClassNotFoundException {
+               in.defaultReadObject();
+               int numKeyFields = in.readInt();
+               keyFields = new Field[numKeyFields];
+               for (int i = 0; i < numKeyFields; i++) {
+                       Class<?> clazz = (Class<?>) in.readObject();
+                       String fieldName = in.readUTF();
+                       // try superclasses as well
+                       while (clazz != null) {
+                               try {
+                                       Field field = 
clazz.getDeclaredField(fieldName);
+                                       field.setAccessible(true);
+                                       keyFields[i] = field;
+                                       break;
+                               } catch (NoSuchFieldException e) {
+                                       clazz = clazz.getSuperclass();
+                               }
+                       }
+                       if (keyFields[i] == null ) {
+                               throw new RuntimeException("Class resolved at 
TaskManager is not compatible with class read during Plan setup."
+                                               + " (" + fieldName + ")");
+                       }
+               }
+       }
+
+       public Field[] getKeyFields() {
+               return this.keyFields;
+       }
+
+       @SuppressWarnings({ "rawtypes", "unchecked" })
+       @Override
+       public void getFlatComparator(List<TypeComparator> flatComparators) {
+               for(int i = 0; i < comparators.length; i++) {
+                       if(comparators[i] instanceof CompositeTypeComparator) {
+                               
((CompositeTypeComparator)comparators[i]).getFlatComparator(flatComparators);
+                       } else {
+                               flatComparators.add(comparators[i]);
+                       }
+               }
+       }
+       
+       /**
+        * This method is handling the IllegalAccess exceptions of Field.get()
+        */
+       public final Object accessField(Field field, Object object) {
+               try {
+                       object = field.get(object);
+               } catch (NullPointerException npex) {
+                       throw new NullKeyFieldException("Unable to access field 
"+field+" on object "+object);
+               } catch (IllegalAccessException iaex) {
+                       throw new RuntimeException("This should not happen 
since we call setAccesssible(true) in PojoTypeInfo."
+                       + " fields: " + field + " obj: " + object);
+               }
+               return object;
+       }
+
+       @Override
+       public int hash(T value) {
+               int i = 0;
+               int code = 0;
+               for (; i < this.keyFields.length; i++) {
+                       code *= TupleComparatorBase.HASH_SALT[i & 0x1F];
+                       try {
+                               code += 
this.comparators[i].hash(accessField(keyFields[i], value));
+                       }catch(NullPointerException npe) {
+                               throw new RuntimeException("A 
NullPointerException occured while accessing a key field in a POJO. " +
+                                               "Most likely, the value 
grouped/joined on is null. Field name: "+keyFields[i].getName(), npe);
+                       }
+               }
+               return code;
+
+       }
+
+       @Override
+       public void setReference(T toCompare) {
+               int i = 0;
+               for (; i < this.keyFields.length; i++) {
+                       
this.comparators[i].setReference(accessField(keyFields[i], toCompare));
+               }
+       }
+
+       @Override
+       public boolean equalToReference(T candidate) {
+               int i = 0;
+               for (; i < this.keyFields.length; i++) {
+                       if 
(!this.comparators[i].equalToReference(accessField(keyFields[i], candidate))) {
+                               return false;
+                       }
+               }
+               return true;
+       }
+
+       @Override
+       public int compareToReference(TypeComparator<T> referencedComparator) {
+               PojoComparator<T> other = (PojoComparator<T>) 
referencedComparator;
+
+               int i = 0;
+               try {
+                       for (; i < this.keyFields.length; i++) {
+                               int cmp = 
this.comparators[i].compareToReference(other.comparators[i]);
+                               if (cmp != 0) {
+                                       return cmp;
+                               }
+                       }
+                       return 0;
+               }
+               catch (NullPointerException npex) {
+                       throw new 
NullKeyFieldException(this.keyFields[i].toString());
+               }
+       }
+
+       @Override
+       public int compare(T first, T second) {
+               int i = 0;
+               for (; i < keyFields.length; i++) {
+                       int cmp = 
comparators[i].compare(accessField(keyFields[i], first), 
accessField(keyFields[i], second));
+                       if (cmp != 0) {
+                               return cmp;
+                       }
+               }
+
+               return 0;
+       }
+
+       
+       @Override
+       public int compareSerialized(DataInputView firstSource, DataInputView 
secondSource) throws IOException {
+               T first = this.serializer.createInstance();
+               T second = this.serializer.createInstance();
+
+               first = this.serializer.deserialize(first, firstSource);
+               second = this.serializer.deserialize(second, secondSource);
+
+               return this.compare(first, second);
+       }
+
+       @Override
+       public boolean supportsNormalizedKey() {
+               return this.numLeadingNormalizableKeys > 0;
+       }
+
+       @Override
+       public int getNormalizeKeyLen() {
+               return this.normalizableKeyPrefixLen;
+       }
+
+       @Override
+       public boolean isNormalizedKeyPrefixOnly(int keyBytes) {
+               return this.numLeadingNormalizableKeys < this.keyFields.length 
||
+                               this.normalizableKeyPrefixLen == 
Integer.MAX_VALUE ||
+                               this.normalizableKeyPrefixLen > keyBytes;
+       }
+
+       @Override
+       public void putNormalizedKey(T value, MemorySegment target, int offset, 
int numBytes) {
+               int i = 0;
+               for (; i < this.numLeadingNormalizableKeys & numBytes > 0; i++)
+               {
+                       int len = this.normalizedKeyLengths[i];
+                       len = numBytes >= len ? len : numBytes;
+                       
this.comparators[i].putNormalizedKey(accessField(keyFields[i], value), target, 
offset, len);
+                       numBytes -= len;
+                       offset += len;
+               }
+       }
+
+       @Override
+       public boolean invertNormalizedKey() {
+               return this.invertNormKey;
+       }
+
+
+       @Override
+       public boolean supportsSerializationWithKeyNormalization() {
+               return false;
+       }
+
+       @Override
+       public void writeWithKeyNormalization(T record, DataOutputView target) 
throws IOException {
+               throw new UnsupportedOperationException();
+       }
+
+       @Override
+       public T readWithKeyDenormalization(T reuse, DataInputView source) 
throws IOException {
+               throw new UnsupportedOperationException();
+       }
+
+       @Override
+       public PojoComparator<T> duplicate() {
+               return new PojoComparator<T>(this);
+       }
+
+       @Override
+       public int extractKeys(Object record, Object[] target, int index) {
+               int localIndex = index;
+               for (int i = 0; i < comparators.length; i++) {
+                       localIndex += 
comparators[i].extractKeys(accessField(keyFields[i], record), target, 
localIndex);
+               }
+               return localIndex - index;
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+}
+

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java
 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java
new file mode 100644
index 0000000..de24956
--- /dev/null
+++ 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java
@@ -0,0 +1,592 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.lang.reflect.Field;
+import java.lang.reflect.Modifier;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+import com.google.common.base.Preconditions;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+
+
+public final class PojoSerializer<T> extends TypeSerializer<T> {
+
+       // Flags for the header
+       private static byte IS_NULL = 1;
+       private static byte NO_SUBCLASS = 2;
+       private static byte IS_SUBCLASS = 4;
+       private static byte IS_TAGGED_SUBCLASS = 8;
+
+       private static final long serialVersionUID = 1L;
+
+       private final Class<T> clazz;
+
+       private final TypeSerializer<Object>[] fieldSerializers;
+
+       private final int numFields;
+
+       private final Map<Class<?>, Integer> registeredClasses;
+
+       private final TypeSerializer<?>[] registeredSerializers;
+
+       private final ExecutionConfig executionConfig;
+
+       private transient Map<Class<?>, TypeSerializer<?>> 
subclassSerializerCache;
+       private transient ClassLoader cl;
+       // We need to handle these ourselves in writeObject()/readObject()
+       private transient Field[] fields;
+
+       @SuppressWarnings("unchecked")
+       public PojoSerializer(
+                       Class<T> clazz,
+                       TypeSerializer<?>[] fieldSerializers,
+                       Field[] fields,
+                       ExecutionConfig executionConfig) {
+
+               this.clazz = Preconditions.checkNotNull(clazz);
+               this.fieldSerializers = (TypeSerializer<Object>[]) 
Preconditions.checkNotNull(fieldSerializers);
+               this.fields = Preconditions.checkNotNull(fields);
+               this.numFields = fieldSerializers.length;
+               this.executionConfig = 
Preconditions.checkNotNull(executionConfig);
+
+               LinkedHashSet<Class<?>> registeredPojoTypes = 
executionConfig.getRegisteredPojoTypes();
+
+               for (int i = 0; i < numFields; i++) {
+                       this.fields[i].setAccessible(true);
+               }
+
+               cl = Thread.currentThread().getContextClassLoader();
+
+               subclassSerializerCache = new HashMap<Class<?>, 
TypeSerializer<?>>();
+
+               // We only want those classes that are not our own class and 
are actually sub-classes.
+               List<Class<?>> cleanedTaggedClasses = new 
ArrayList<Class<?>>(registeredPojoTypes.size());
+               for (Class<?> registeredClass: registeredPojoTypes) {
+                       if (registeredClass.equals(clazz)) {
+                               continue;
+                       }
+                       if (!clazz.isAssignableFrom(registeredClass)) {
+                               continue;
+                       }
+                       cleanedTaggedClasses.add(registeredClass);
+
+               }
+               this.registeredClasses = new LinkedHashMap<Class<?>, 
Integer>(cleanedTaggedClasses.size());
+               registeredSerializers = new 
TypeSerializer[cleanedTaggedClasses.size()];
+
+               int id = 0;
+               for (Class<?> registeredClass: cleanedTaggedClasses) {
+                       this.registeredClasses.put(registeredClass, id);
+                       TypeInformation<?> typeInfo = 
TypeExtractor.createTypeInfo(registeredClass);
+                       registeredSerializers[id] = 
typeInfo.createSerializer(executionConfig);
+
+                       id++;
+               }
+       }
+
+       private void writeObject(ObjectOutputStream out)
+                       throws IOException, ClassNotFoundException {
+               out.defaultWriteObject();
+               out.writeInt(fields.length);
+               for (Field field: fields) {
+                       out.writeObject(field.getDeclaringClass());
+                       out.writeUTF(field.getName());
+               }
+       }
+
+       private void readObject(ObjectInputStream in)
+                       throws IOException, ClassNotFoundException {
+               in.defaultReadObject();
+               int numFields = in.readInt();
+               fields = new Field[numFields];
+               for (int i = 0; i < numFields; i++) {
+                       Class<?> clazz = (Class<?>)in.readObject();
+                       String fieldName = in.readUTF();
+                       fields[i] = null;
+                       // try superclasses as well
+                       while (clazz != null) {
+                               try {
+                                       fields[i] = 
clazz.getDeclaredField(fieldName);
+                                       fields[i].setAccessible(true);
+                                       break;
+                               } catch (NoSuchFieldException e) {
+                                       clazz = clazz.getSuperclass();
+                               }
+                       }
+                       if (fields[i] == null) {
+                               throw new RuntimeException("Class resolved at 
TaskManager is not compatible with class read during Plan setup."
+                                               + " (" + fieldName + ")");
+                       }
+               }
+
+               cl = Thread.currentThread().getContextClassLoader();
+               subclassSerializerCache = new HashMap<Class<?>, 
TypeSerializer<?>>();
+       }
+
+       private TypeSerializer<?> getSubclassSerializer(Class<?> subclass) {
+               TypeSerializer<?> result = 
subclassSerializerCache.get(subclass);
+               if (result == null) {
+
+                       TypeInformation<?> typeInfo = 
TypeExtractor.createTypeInfo(subclass);
+                       result = typeInfo.createSerializer(executionConfig);
+                       if (result instanceof PojoSerializer) {
+                               PojoSerializer<?> subclassSerializer = 
(PojoSerializer<?>) result;
+                               subclassSerializer.copyBaseFieldOrder(this);
+                       }
+                       subclassSerializerCache.put(subclass, result);
+
+               }
+               return result;
+       }
+
+       @SuppressWarnings("unused")
+       private boolean hasField(Field f) {
+               for (Field field: fields) {
+                       if (f.equals(field)) {
+                               return true;
+                       }
+               }
+               return false;
+       }
+
+       private void copyBaseFieldOrder(PojoSerializer<?> baseSerializer) {
+               // do nothing for now, but in the future, adapt subclass 
serializer to have same
+               // ordering as base class serializer so that binary comparison 
on base class fields
+               // can work
+       }
+       
+       @Override
+       public boolean isImmutableType() {
+               return false;
+       }
+
+       @Override
+       public PojoSerializer<T> duplicate() {
+               boolean stateful = false;
+               TypeSerializer<?>[] duplicateFieldSerializers = new 
TypeSerializer[fieldSerializers.length];
+
+               for (int i = 0; i < fieldSerializers.length; i++) {
+                       duplicateFieldSerializers[i] = 
fieldSerializers[i].duplicate();
+                       if (duplicateFieldSerializers[i] != 
fieldSerializers[i]) {
+                               // at least one of them is stateful
+                               stateful = true;
+                       }
+               }
+
+               if (stateful) {
+                       return new PojoSerializer<T>(clazz, 
duplicateFieldSerializers, fields, executionConfig);
+               } else {
+                       return this;
+               }
+       }
+
+       
+       @Override
+       public T createInstance() {
+               if (clazz.isInterface() || 
Modifier.isAbstract(clazz.getModifiers())) {
+                       return null;
+               }
+               try {
+                       T t = clazz.newInstance();
+                       initializeFields(t);
+                       return t;
+               }
+               catch (Exception e) {
+                       throw new RuntimeException("Cannot instantiate class.", 
e);
+               }
+       }
+
+       protected void initializeFields(T t) {
+               for (int i = 0; i < numFields; i++) {
+                       try {
+                               fields[i].set(t, 
fieldSerializers[i].createInstance());
+                       } catch (IllegalAccessException e) {
+                               throw new RuntimeException("Cannot initialize 
fields.", e);
+                       }
+               }
+       }
+
+       @Override
+       @SuppressWarnings({"unchecked", "rawtypes"})
+       public T copy(T from) {
+               if (from == null) {
+                       return null;
+               }
+
+               Class<?> actualType = from.getClass();
+               if (actualType == clazz) {
+                       T target;
+                       try {
+                               target = (T) from.getClass().newInstance();
+                       }
+                       catch (Throwable t) {
+                               throw new RuntimeException("Cannot instantiate 
class.", t);
+                       }
+                       // no subclass
+                       try {
+                               for (int i = 0; i < numFields; i++) {
+                                       Object value = fields[i].get(from);
+                                       if (value != null) {
+                                               Object copy = 
fieldSerializers[i].copy(value);
+                                               fields[i].set(target, copy);
+                                       }
+                                       else {
+                                               fields[i].set(target, null);
+                                       }
+                               }
+                       } catch (IllegalAccessException e) {
+                               throw new RuntimeException("Error during POJO 
copy, this should not happen since we check the fields before.");
+
+                       }
+                       return target;
+               } else {
+                       // subclass
+                       TypeSerializer subclassSerializer = 
getSubclassSerializer(actualType);
+                       return (T) subclassSerializer.copy(from);
+               }
+       }
+       
+       @Override
+       @SuppressWarnings({"unchecked", "rawtypes"})
+       public T copy(T from, T reuse) {
+               if (from == null) {
+                       return null;
+               }
+
+               Class<?> actualType = from.getClass();
+               if (reuse == null || actualType != reuse.getClass()) {
+                       // cannot reuse, do a non-reuse copy
+                       return copy(from);
+               }
+
+               if (actualType == clazz) {
+                       try {
+                               for (int i = 0; i < numFields; i++) {
+                                       Object value = fields[i].get(from);
+                                       if (value != null) {
+                                               Object reuseValue = 
fields[i].get(reuse);
+                                               Object copy;
+                                               if(reuseValue != null) {
+                                                       copy = 
fieldSerializers[i].copy(value, reuseValue);
+                                               }
+                                               else {
+                                                       copy = 
fieldSerializers[i].copy(value);
+                                               }
+                                               fields[i].set(reuse, copy);
+                                       }
+                                       else {
+                                               fields[i].set(reuse, null);
+                                       }
+                               }
+                       } catch (IllegalAccessException e) {
+                               throw new RuntimeException("Error during POJO 
copy, this should not happen since we check the fields" + "before.");
+                       }
+               } else {
+                       TypeSerializer subclassSerializer = 
getSubclassSerializer(actualType);
+                       reuse = (T) subclassSerializer.copy(from, reuse);
+               }
+
+               return reuse;
+       }
+
+       @Override
+       public int getLength() {
+               return -1;
+       }
+
+
+       @Override
+       @SuppressWarnings({"unchecked", "rawtypes"})
+       public void serialize(T value, DataOutputView target) throws 
IOException {
+               int flags = 0;
+               // handle null values
+               if (value == null) {
+                       flags |= IS_NULL;
+                       target.writeByte(flags);
+                       return;
+               }
+
+               Integer subclassTag = -1;
+               Class<?> actualClass = value.getClass();
+               TypeSerializer subclassSerializer = null;
+               if (clazz != actualClass) {
+                       subclassTag = registeredClasses.get(actualClass);
+                       if (subclassTag != null) {
+                               flags |= IS_TAGGED_SUBCLASS;
+                               subclassSerializer = 
registeredSerializers[subclassTag];
+                       } else {
+                               flags |= IS_SUBCLASS;
+                               subclassSerializer = 
getSubclassSerializer(actualClass);
+                       }
+               } else {
+                       flags |= NO_SUBCLASS;
+               }
+
+               target.writeByte(flags);
+
+               if ((flags & IS_SUBCLASS) != 0) {
+                       target.writeUTF(actualClass.getName());
+               } else if ((flags & IS_TAGGED_SUBCLASS) != 0) {
+                       target.writeByte(subclassTag);
+               }
+
+
+               if ((flags & NO_SUBCLASS) != 0) {
+                       try {
+                               for (int i = 0; i < numFields; i++) {
+                                       Object o = fields[i].get(value);
+                                       if (o == null) {
+                                               target.writeBoolean(true); // 
null field handling
+                                       } else {
+                                               target.writeBoolean(false);
+                                               
fieldSerializers[i].serialize(o, target);
+                                       }
+                               }
+                       } catch (IllegalAccessException e) {
+                               throw new RuntimeException("Error during POJO 
copy, this should not happen since we check the fields" + "before.");
+
+                       }
+               } else {
+                       // subclass
+                       if (subclassSerializer != null) {
+                               subclassSerializer.serialize(value, target);
+                       }
+               }
+       }
+
+       @Override
+       @SuppressWarnings({"unchecked", "rawtypes"})
+       public T deserialize(DataInputView source) throws IOException {
+               int flags = source.readByte();
+               if((flags & IS_NULL) != 0) {
+                       return null;
+               }
+
+               T target;
+
+               Class<?> actualSubclass = null;
+               TypeSerializer subclassSerializer = null;
+
+               if ((flags & IS_SUBCLASS) != 0) {
+                       String subclassName = source.readUTF();
+                       try {
+                               actualSubclass = Class.forName(subclassName, 
true, cl);
+                       } catch (ClassNotFoundException e) {
+                               throw new RuntimeException("Cannot instantiate 
class.", e);
+                       }
+                       subclassSerializer = 
getSubclassSerializer(actualSubclass);
+                       target = (T) subclassSerializer.createInstance();
+                       // also initialize fields for which the subclass 
serializer is not responsible
+                       initializeFields(target);
+               } else if ((flags & IS_TAGGED_SUBCLASS) != 0) {
+
+                       int subclassTag = source.readByte();
+                       subclassSerializer = registeredSerializers[subclassTag];
+                       target = (T) subclassSerializer.createInstance();
+                       // also initialize fields for which the subclass 
serializer is not responsible
+                       initializeFields(target);
+               } else {
+                       target = createInstance();
+               }
+
+               if ((flags & NO_SUBCLASS) != 0) {
+                       try {
+                               for (int i = 0; i < numFields; i++) {
+                                       boolean isNull = source.readBoolean();
+                                       if (isNull) {
+                                               fields[i].set(target, null);
+                                       } else {
+                                               Object field = 
fieldSerializers[i].deserialize(source);
+                                               fields[i].set(target, field);
+                                       }
+                               }
+                       } catch (IllegalAccessException e) {
+                               throw new RuntimeException("Error during POJO 
copy, this should not happen since we check the fields" + "before.");
+
+                       }
+               } else {
+                       if (subclassSerializer != null) {
+                               target = (T) 
subclassSerializer.deserialize(target, source);
+                       }
+               }
+               return target;
+       }
+       
+       @Override
+       @SuppressWarnings({"unchecked", "rawtypes"})
+       public T deserialize(T reuse, DataInputView source) throws IOException {
+
+               // handle null values
+               int flags = source.readByte();
+               if((flags & IS_NULL) != 0) {
+                       return null;
+               }
+
+               Class<?> subclass = null;
+               TypeSerializer subclassSerializer = null;
+               if ((flags & IS_SUBCLASS) != 0) {
+                       String subclassName = source.readUTF();
+                       try {
+                               subclass = Class.forName(subclassName, true, 
cl);
+                       } catch (ClassNotFoundException e) {
+                               throw new RuntimeException("Cannot instantiate 
class.", e);
+                       }
+                       subclassSerializer = getSubclassSerializer(subclass);
+
+                       if (reuse == null || subclass != reuse.getClass()) {
+                               // cannot reuse
+                               reuse = (T) subclassSerializer.createInstance();
+                               // also initialize fields for which the 
subclass serializer is not responsible
+                               initializeFields(reuse);
+                       }
+               } else if ((flags & IS_TAGGED_SUBCLASS) != 0) {
+                       int subclassTag = source.readByte();
+                       subclassSerializer = registeredSerializers[subclassTag];
+
+                       if (reuse == null || 
((PojoSerializer)subclassSerializer).clazz != reuse.getClass()) {
+                               // cannot reuse
+                               reuse = (T) subclassSerializer.createInstance();
+                               // also initialize fields for which the 
subclass serializer is not responsible
+                               initializeFields(reuse);
+                       }
+               } else {
+                       if (reuse == null || clazz != reuse.getClass()) {
+                               reuse = createInstance();
+                       }
+               }
+
+               if ((flags & NO_SUBCLASS) != 0) {
+                       try {
+                               for (int i = 0; i < numFields; i++) {
+                                       boolean isNull = source.readBoolean();
+                                       if (isNull) {
+                                               fields[i].set(reuse, null);
+                                       } else {
+                                               Object field;
+
+                                               Object reuseField = 
fields[i].get(reuse);
+                                               if(reuseField != null) {
+                                                       field = 
fieldSerializers[i].deserialize(reuseField, source);
+                                               }
+                                               else {
+                                                       field = 
fieldSerializers[i].deserialize(source);
+                                               }
+
+                                               fields[i].set(reuse, field);
+                                       }
+                               }
+                       } catch (IllegalAccessException e) {
+                               throw new RuntimeException(
+                                               "Error during POJO copy, this 
should not happen since we check the fields before.");
+                       }
+               } else {
+                       if (subclassSerializer != null) {
+                               reuse = (T) 
subclassSerializer.deserialize(reuse, source);
+                       }
+               }
+
+               return reuse;
+       }
+
+       @Override
+       public void copy(DataInputView source, DataOutputView target) throws 
IOException {
+               // copy the flags
+               int flags = source.readByte();
+               target.writeByte(flags);
+
+               if ((flags & IS_NULL) != 0) {
+                       // is a null value, nothing further to copy
+                       return;
+               }
+
+               TypeSerializer<?> subclassSerializer = null;
+               if ((flags & IS_SUBCLASS) != 0) {
+                       String className = source.readUTF();
+                       target.writeUTF(className);
+                       try {
+                               Class<?> subclass = Class.forName(className, 
true, Thread.currentThread()
+                                               .getContextClassLoader());
+                               subclassSerializer = 
getSubclassSerializer(subclass);
+                       } catch (ClassNotFoundException e) {
+                               throw new RuntimeException("Cannot instantiate 
class.", e);
+                       }
+               } else if ((flags & IS_TAGGED_SUBCLASS) != 0) {
+                       int subclassTag = source.readByte();
+                       target.writeByte(subclassTag);
+                       subclassSerializer = registeredSerializers[subclassTag];
+               }
+
+               if ((flags & NO_SUBCLASS) != 0) {
+                       for (int i = 0; i < numFields; i++) {
+                               boolean isNull = source.readBoolean();
+                               target.writeBoolean(isNull);
+                               if (!isNull) {
+                                       fieldSerializers[i].copy(source, 
target);
+                               }
+                       }
+               } else {
+                       if (subclassSerializer != null) {
+                               subclassSerializer.copy(source, target);
+                       }
+               }
+       }
+       
+       @Override
+       public int hashCode() {
+               return 31 * (31 * Arrays.hashCode(fieldSerializers) + 
Arrays.hashCode(registeredSerializers)) +
+                       Objects.hash(clazz, numFields, registeredClasses);
+       }
+       
+       @Override
+       public boolean equals(Object obj) {
+               if (obj instanceof PojoSerializer) {
+                       PojoSerializer<?> other = (PojoSerializer<?>) obj;
+
+                       return other.canEqual(this) &&
+                               clazz == other.clazz &&
+                               Arrays.equals(fieldSerializers, 
other.fieldSerializers) &&
+                               Arrays.equals(registeredSerializers, 
other.registeredSerializers) &&
+                               numFields == other.numFields &&
+                               
registeredClasses.equals(other.registeredClasses);
+               } else {
+                       return false;
+               }
+       }
+
+       @Override
+       public boolean canEqual(Object obj) {
+               return obj instanceof PojoSerializer;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RuntimeComparatorFactory.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RuntimeComparatorFactory.java
 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RuntimeComparatorFactory.java
new file mode 100644
index 0000000..4b734a7
--- /dev/null
+++ 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RuntimeComparatorFactory.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.api.java.typeutils.runtime;
+
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeComparatorFactory;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.util.InstantiationUtil;
+
+public final class RuntimeComparatorFactory<T> implements 
TypeComparatorFactory<T>, java.io.Serializable {
+
+       private static final long serialVersionUID = 1L;
+
+
+       private static final String CONFIG_KEY = "SER_DATA";
+
+       private TypeComparator<T> comparator;
+
+
+       public RuntimeComparatorFactory() {}
+
+       public RuntimeComparatorFactory(TypeComparator<T> comparator) {
+               this.comparator = comparator;
+       }
+
+       @Override
+       public void writeParametersToConfig(Configuration config) {
+               try {
+                       InstantiationUtil.writeObjectToConfig(comparator, 
config, CONFIG_KEY);
+               }
+               catch (Exception e) {
+                       throw new RuntimeException("Could not serialize 
comparator into the configuration.", e);
+               }
+       }
+
+       @SuppressWarnings("unchecked")
+       @Override
+       public void readParametersFromConfig(Configuration config, ClassLoader 
cl) throws ClassNotFoundException {
+               try {
+                       comparator = (TypeComparator<T>) 
InstantiationUtil.readObjectFromConfig(config, CONFIG_KEY, cl);
+               }
+               catch (ClassNotFoundException e) {
+                       throw e;
+               }
+               catch (Exception e) {
+                       throw new RuntimeException("Could not serialize 
serializer into the configuration.", e);
+               }
+       }
+
+       @Override
+       public TypeComparator<T> createComparator() {
+               if (comparator != null) {
+                       return comparator;
+               } else {
+                       throw new RuntimeException("ComparatorFactory has not 
been initialized from configuration.");
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RuntimePairComparatorFactory.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RuntimePairComparatorFactory.java
 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RuntimePairComparatorFactory.java
new file mode 100644
index 0000000..31e28f7
--- /dev/null
+++ 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RuntimePairComparatorFactory.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime;
+
+import org.apache.flink.api.common.typeutils.GenericPairComparator;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypePairComparator;
+import org.apache.flink.api.common.typeutils.TypePairComparatorFactory;
+
+public final class RuntimePairComparatorFactory<T1, T2>
+               implements TypePairComparatorFactory<T1, T2>, 
java.io.Serializable {
+
+       private static final long serialVersionUID = 1L;
+
+       @Override
+       public TypePairComparator<T1, T2> createComparator12(
+                       TypeComparator<T1> comparator1,
+                       TypeComparator<T2> comparator2) {
+               return new GenericPairComparator<T1, T2>(comparator1, 
comparator2);
+       }
+
+       @Override
+       public TypePairComparator<T2, T1> createComparator21(
+                       TypeComparator<T1> comparator1,
+                       TypeComparator<T2> comparator2) {
+               return new GenericPairComparator<T2, T1>(comparator2, 
comparator1);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RuntimeSerializerFactory.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RuntimeSerializerFactory.java
 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RuntimeSerializerFactory.java
new file mode 100644
index 0000000..96aff73
--- /dev/null
+++ 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RuntimeSerializerFactory.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.api.java.typeutils.runtime;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.util.InstantiationUtil;
+
+public final class RuntimeSerializerFactory<T> implements 
TypeSerializerFactory<T>, java.io.Serializable {
+
+       private static final long serialVersionUID = 1L;
+       
+
+       private static final String CONFIG_KEY_SER = "SER_DATA";
+
+       private static final String CONFIG_KEY_CLASS = "CLASS_DATA";
+
+       
+       private TypeSerializer<T> serializer;
+
+       private boolean firstSerializer = true;
+
+       private Class<T> clazz;
+
+       // Because we read the class from the TaskConfig and instantiate 
ourselves
+       public RuntimeSerializerFactory() {}
+
+       public RuntimeSerializerFactory(TypeSerializer<T> serializer, Class<T> 
clazz) {
+               if (serializer == null || clazz == null) {
+                       throw new NullPointerException();
+               }
+
+               this.clazz = clazz;
+               this.serializer = serializer;
+       }
+
+
+       @Override
+       public void writeParametersToConfig(Configuration config) {
+               try {
+                       InstantiationUtil.writeObjectToConfig(clazz, config, 
CONFIG_KEY_CLASS);
+                       InstantiationUtil.writeObjectToConfig(serializer, 
config, CONFIG_KEY_SER);
+               }
+               catch (Exception e) {
+                       throw new RuntimeException("Could not serialize 
serializer into the configuration.", e);
+               }
+       }
+
+       @SuppressWarnings("unchecked")
+       @Override
+       public void readParametersFromConfig(Configuration config, ClassLoader 
cl) throws ClassNotFoundException {
+               if (config == null || cl == null) {
+                       throw new NullPointerException();
+               }
+               
+               try {
+                       this.clazz = (Class<T>) 
InstantiationUtil.readObjectFromConfig(config, CONFIG_KEY_CLASS, cl);
+                       this.serializer = (TypeSerializer<T>)  
InstantiationUtil.readObjectFromConfig(config, CONFIG_KEY_SER, cl);
+                       firstSerializer = true;
+               }
+               catch (ClassNotFoundException e) {
+                       throw e;
+               }
+               catch (Exception e) {
+                       throw new RuntimeException("Could not load deserializer 
from the configuration.", e);
+               }
+       }
+
+       @Override
+       public TypeSerializer<T> getSerializer() {
+               if (this.serializer != null) {
+                       if (firstSerializer) {
+                               firstSerializer = false;
+                               return this.serializer;
+                       } else {
+                               return this.serializer.duplicate();
+                       }
+               } else {
+                       throw new RuntimeException("SerializerFactory has not 
been initialized from configuration.");
+               }
+       }
+
+       @Override
+       public Class<T> getDataType() {
+               return clazz;
+       }
+       
+       // 
--------------------------------------------------------------------------------------------
+       
+       @Override
+       public int hashCode() {
+               return clazz.hashCode() ^ serializer.hashCode();
+       }
+       
+       @Override
+       public boolean equals(Object obj) {
+               if (obj != null && obj instanceof RuntimeSerializerFactory) {
+                       RuntimeSerializerFactory<?> other = 
(RuntimeSerializerFactory<?>) obj;
+                       
+                       return this.clazz == other.clazz &&
+                                       
this.serializer.equals(other.serializer);
+               } else {
+                       return false;
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/Tuple0Serializer.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/Tuple0Serializer.java
 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/Tuple0Serializer.java
new file mode 100644
index 0000000..a06ff1a
--- /dev/null
+++ 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/Tuple0Serializer.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding 
copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the "License"); you may not 
use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either 
express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.flink.api.java.typeutils.runtime;
+
+import java.io.IOException;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple0;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+
+public class Tuple0Serializer extends TupleSerializer<Tuple0> {
+       
+       private static final long serialVersionUID = 1278813169022975971L;
+
+       public static final Tuple0Serializer INSTANCE = new Tuple0Serializer();
+
+       // 
------------------------------------------------------------------------
+       
+       private Tuple0Serializer() {
+               super(Tuple0.class, new TypeSerializer<?>[0]);
+       }
+
+       // 
------------------------------------------------------------------------
+
+       @Override
+       public Tuple0Serializer duplicate() {
+               return this;
+       }
+
+       @Override
+       public Tuple0 createInstance() {
+               return Tuple0.INSTANCE;
+       }
+
+       @Override
+       public Tuple0 createInstance(Object[] fields) {
+               if (fields == null || fields.length == 0) {
+                       return Tuple0.INSTANCE;
+               }
+
+               throw new UnsupportedOperationException(
+                               "Tuple0 cannot take any data, as it has zero 
fields.");
+       }
+
+       @Override
+       public Tuple0 copy(Tuple0 from) {
+               return from;
+       }
+
+       @Override
+       public Tuple0 copy(Tuple0 from, Tuple0 reuse) {
+               return reuse;
+       }
+
+       @Override
+       public int getLength() {
+               return 1;
+       }
+
+       @Override
+       public void serialize(Tuple0 record, DataOutputView target) throws 
IOException {
+               target.writeByte(42);
+       }
+
+       @Override
+       public Tuple0 deserialize(DataInputView source) throws IOException {
+               source.readByte();
+               return Tuple0.INSTANCE;
+       }
+
+       @Override
+       public Tuple0 deserialize(Tuple0 reuse, DataInputView source) throws 
IOException {
+               source.readByte();
+               return reuse;
+       }
+
+       @Override
+       public void copy(DataInputView source, DataOutputView target) throws 
IOException {
+               target.writeByte(source.readByte());
+       }
+
+       // 
------------------------------------------------------------------------
+
+
+       @Override
+       public int hashCode() {
+               return Tuple0Serializer.class.hashCode();
+       }
+
+       @Override
+       public boolean equals(Object obj) {
+               if (obj instanceof Tuple0Serializer) {
+                       Tuple0Serializer other = (Tuple0Serializer) obj;
+
+                       return other.canEqual(this);
+               } else {
+                       return false;
+               }
+       }
+
+       @Override
+       public boolean canEqual(Object obj) {
+               return obj instanceof Tuple0Serializer;
+       }
+
+       @Override
+       public String toString() {
+               return "Tuple0Serializer";
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleComparator.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleComparator.java
 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleComparator.java
new file mode 100644
index 0000000..875ecc2
--- /dev/null
+++ 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleComparator.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime;
+
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.types.KeyFieldOutOfBoundsException;
+import org.apache.flink.types.NullFieldException;
+import org.apache.flink.types.NullKeyFieldException;
+
+
+public final class TupleComparator<T extends Tuple> extends 
TupleComparatorBase<T> {
+
+       private static final long serialVersionUID = 1L;
+       
+       public TupleComparator(int[] keyPositions, TypeComparator<?>[] 
comparators, TypeSerializer<?>[] serializers) {
+               super(keyPositions, comparators, serializers);
+       }
+       
+       private TupleComparator(TupleComparator<T> toClone) {
+               super(toClone);
+       }
+       
+       // 
--------------------------------------------------------------------------------------------
+       //  Comparator Methods
+       // 
--------------------------------------------------------------------------------------------
+       
+       @SuppressWarnings("unchecked")
+       @Override
+       public int hash(T value) {
+               int i = 0;
+               try {
+                       int code = 
this.comparators[0].hash(value.getFieldNotNull(keyPositions[0]));
+                       for (i = 1; i < this.keyPositions.length; i++) {
+                               code *= HASH_SALT[i & 0x1F]; // salt code with 
(i % HASH_SALT.length)-th salt component
+                               code += 
this.comparators[i].hash(value.getFieldNotNull(keyPositions[i]));
+                       }
+                       return code;
+               }
+               catch (NullFieldException nfex) {
+                       throw new NullKeyFieldException(nfex);
+               }
+               catch (IndexOutOfBoundsException iobex) {
+                       throw new KeyFieldOutOfBoundsException(keyPositions[i]);
+               }
+       }
+
+       @SuppressWarnings("unchecked")
+       @Override
+       public void setReference(T toCompare) {
+               int i = 0;
+               try {
+                       for (; i < this.keyPositions.length; i++) {
+                               
this.comparators[i].setReference(toCompare.getFieldNotNull(this.keyPositions[i]));
+                       }
+               }
+               catch (NullFieldException nfex) {
+                       throw new NullKeyFieldException(nfex);
+               }
+               catch (IndexOutOfBoundsException iobex) {
+                       throw new KeyFieldOutOfBoundsException(keyPositions[i]);
+               }
+       }
+
+       @SuppressWarnings("unchecked")
+       @Override
+       public boolean equalToReference(T candidate) {
+               int i = 0;
+               try {
+                       for (; i < this.keyPositions.length; i++) {
+                               if 
(!this.comparators[i].equalToReference(candidate.getFieldNotNull(this.keyPositions[i])))
 {
+                                       return false;
+                               }
+                       }
+                       return true;
+               }
+               catch (NullFieldException nfex) {
+                       throw new NullKeyFieldException(nfex);
+               }
+               catch (IndexOutOfBoundsException iobex) {
+                       throw new KeyFieldOutOfBoundsException(keyPositions[i]);
+               }
+       }
+       
+       @SuppressWarnings("unchecked")
+       @Override
+       public int compare(T first, T second) {
+               int i = 0;
+               try {
+                       for (; i < keyPositions.length; i++) {
+                               int keyPos = keyPositions[i];
+                               int cmp = 
comparators[i].compare(first.getFieldNotNull(keyPos), 
second.getFieldNotNull(keyPos));
+
+                               if (cmp != 0) {
+                                       return cmp;
+                               }
+                       }
+                       return 0;
+               } 
+               catch (NullFieldException nfex) {
+                       throw new NullKeyFieldException(nfex);
+               }
+               catch (IndexOutOfBoundsException iobex) {
+                       throw new KeyFieldOutOfBoundsException(keyPositions[i]);
+               }
+       }
+
+       @SuppressWarnings("unchecked")
+       @Override
+       public void putNormalizedKey(T value, MemorySegment target, int offset, 
int numBytes) {
+               int i = 0;
+               try {
+                       for (; i < this.numLeadingNormalizableKeys && numBytes 
> 0; i++) {
+                               int len = this.normalizedKeyLengths[i];
+                               len = numBytes >= len ? len : numBytes;
+                               
this.comparators[i].putNormalizedKey(value.getFieldNotNull(this.keyPositions[i]),
 target, offset, len);
+                               numBytes -= len;
+                               offset += len;
+                       }
+               } catch (NullFieldException nfex) {
+                       throw new NullKeyFieldException(nfex);
+               } catch (NullPointerException npex) {
+                       throw new NullKeyFieldException(this.keyPositions[i]);
+               }
+       }
+
+       @Override
+       public int extractKeys(Object record, Object[] target, int index) {
+               int localIndex = index;
+               for(int i = 0; i < comparators.length; i++) {
+                       localIndex += comparators[i].extractKeys(((Tuple) 
record).getField(keyPositions[i]), target, localIndex);
+               }
+               return localIndex - index;
+       }
+
+       public TypeComparator<T> duplicate() {
+               return new TupleComparator<T>(this);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorBase.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorBase.java
 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorBase.java
new file mode 100644
index 0000000..28169e5
--- /dev/null
+++ 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorBase.java
@@ -0,0 +1,279 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.java.typeutils.runtime;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.flink.api.common.typeutils.CompositeTypeComparator;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.types.KeyFieldOutOfBoundsException;
+import org.apache.flink.types.NullKeyFieldException;
+
+
+public abstract class TupleComparatorBase<T> extends 
CompositeTypeComparator<T> implements java.io.Serializable {
+
+       private static final long serialVersionUID = 1L;
+
+       /** key positions describe which fields are keys in what order */
+       protected int[] keyPositions;
+
+       /** comparators for the key fields, in the same order as the key fields 
*/
+       @SuppressWarnings("rawtypes")
+       protected TypeComparator[] comparators;
+
+       protected int[] normalizedKeyLengths;
+
+       protected int numLeadingNormalizableKeys;
+
+       protected int normalizableKeyPrefixLen;
+
+       protected boolean invertNormKey;
+
+
+       /** serializers to deserialize the first n fields for comparison */
+       @SuppressWarnings("rawtypes")
+       protected TypeSerializer[] serializers;
+
+       // cache for the deserialized field objects
+       protected transient Object[] deserializedFields1;
+       protected transient Object[] deserializedFields2;
+
+
+       @SuppressWarnings("unchecked")
+       public TupleComparatorBase(int[] keyPositions, TypeComparator<?>[] 
comparators, TypeSerializer<?>[] serializers) {
+               // set the default utils
+               this.keyPositions = keyPositions;
+               this.comparators = (TypeComparator<Object>[]) comparators;
+               this.serializers = (TypeSerializer<Object>[]) serializers;
+
+               // set up auxiliary fields for normalized key support
+               this.normalizedKeyLengths = new int[keyPositions.length];
+               int nKeys = 0;
+               int nKeyLen = 0;
+               boolean inverted = false;
+
+               for (int i = 0; i < this.keyPositions.length; i++) {
+                       TypeComparator<?> k = this.comparators[i];
+
+                       // as long as the leading keys support normalized keys, 
we can build up the composite key
+                       if (k.supportsNormalizedKey()) {
+                               if (i == 0) {
+                                       // the first comparator decides whether 
we need to invert the key direction
+                                       inverted = k.invertNormalizedKey();
+                               }
+                               else if (k.invertNormalizedKey() != inverted) {
+                                       // if a successor does not agree on the 
inversion direction, it cannot be part of the normalized key
+                                       break;
+                               }
+
+                               nKeys++;
+                               final int len = k.getNormalizeKeyLen();
+                               if (len < 0) {
+                                       throw new RuntimeException("Comparator 
" + k.getClass().getName() + " specifies an invalid length for the normalized 
key: " + len);
+                               }
+                               this.normalizedKeyLengths[i] = len;
+                               nKeyLen += len;
+
+                               if (nKeyLen < 0) {
+                                       // overflow, which means we are out of 
budget for normalized key space anyways
+                                       nKeyLen = Integer.MAX_VALUE;
+                                       break;
+                               }
+                       } else {
+                               break;
+                       }
+               }
+               this.numLeadingNormalizableKeys = nKeys;
+               this.normalizableKeyPrefixLen = nKeyLen;
+               this.invertNormKey = inverted;
+       }
+
+       protected TupleComparatorBase(TupleComparatorBase<T> toClone) {
+               privateDuplicate(toClone);
+       }
+
+       // We need this because we cannot call the cloning constructor from the
+       // ScalaTupleComparator
+       protected void privateDuplicate(TupleComparatorBase<T> toClone) {
+               // copy fields and serializer factories
+               this.keyPositions = toClone.keyPositions;
+
+               this.serializers = new 
TypeSerializer[toClone.serializers.length];
+               for (int i = 0; i < toClone.serializers.length; i++) {
+                       this.serializers[i] = 
toClone.serializers[i].duplicate();
+               }
+
+               this.comparators = new 
TypeComparator[toClone.comparators.length];
+               for (int i = 0; i < toClone.comparators.length; i++) {
+                       this.comparators[i] = 
toClone.comparators[i].duplicate();
+               }
+
+               this.normalizedKeyLengths = toClone.normalizedKeyLengths;
+               this.numLeadingNormalizableKeys = 
toClone.numLeadingNormalizableKeys;
+               this.normalizableKeyPrefixLen = 
toClone.normalizableKeyPrefixLen;
+               this.invertNormKey = toClone.invertNormKey;
+       }
+       
+       // 
--------------------------------------------------------------------------------------------
+       //  Comparator Methods
+       // 
--------------------------------------------------------------------------------------------
+       
+       protected int[] getKeyPositions() {
+               return this.keyPositions;
+       }
+       
+       
+       @SuppressWarnings({ "rawtypes", "unchecked" })
+       @Override
+       public void getFlatComparator(List<TypeComparator> flatComparators) {
+               for(int i = 0; i < comparators.length; i++) {
+                       if(comparators[i] instanceof CompositeTypeComparator) {
+                               
((CompositeTypeComparator)comparators[i]).getFlatComparator(flatComparators);
+                       } else {
+                               flatComparators.add(comparators[i]);
+                       }
+               }
+       }       
+       // 
--------------------------------------------------------------------------------------------
+       //  Comparator Methods
+       // 
--------------------------------------------------------------------------------------------
+
+
+       @Override
+       public int compareToReference(TypeComparator<T> referencedComparator) {
+               TupleComparatorBase<T> other = (TupleComparatorBase<T>) 
referencedComparator;
+               
+               int i = 0;
+               try {
+                       for (; i < this.keyPositions.length; i++) {
+                               @SuppressWarnings("unchecked")
+                               int cmp = 
this.comparators[i].compareToReference(other.comparators[i]);
+                               if (cmp != 0) {
+                                       return cmp;
+                               }
+                       }
+                       return 0;
+               }
+               catch (NullPointerException npex) {
+                       throw new NullKeyFieldException(keyPositions[i]);
+               }
+               catch (IndexOutOfBoundsException iobex) {
+                       throw new KeyFieldOutOfBoundsException(keyPositions[i]);
+               }
+       }
+       
+       @SuppressWarnings("unchecked")
+       @Override
+       public int compareSerialized(DataInputView firstSource, DataInputView 
secondSource) throws IOException {
+               if (deserializedFields1 == null) {
+                       instantiateDeserializationUtils();
+               }
+               
+               int i = 0;
+               try {
+                       for (; i < serializers.length; i++) {
+                               deserializedFields1[i] = 
serializers[i].deserialize(deserializedFields1[i], firstSource);
+                               deserializedFields2[i] = 
serializers[i].deserialize(deserializedFields2[i], secondSource);
+                       }
+                       
+                       for (i = 0; i < keyPositions.length; i++) {
+                               int keyPos = keyPositions[i];
+                               int cmp = 
comparators[i].compare(deserializedFields1[keyPos], 
deserializedFields2[keyPos]);
+                               if (cmp != 0) {
+                                       return cmp;
+                               }
+                       }
+                       
+                       return 0;
+               } catch (NullPointerException npex) {
+                       throw new NullKeyFieldException(keyPositions[i]);
+               } catch (IndexOutOfBoundsException iobex) {
+                       throw new KeyFieldOutOfBoundsException(keyPositions[i], 
iobex);
+               }
+       }
+       
+       @Override
+       public boolean supportsNormalizedKey() {
+               return this.numLeadingNormalizableKeys > 0;
+       }
+
+       @Override
+       public int getNormalizeKeyLen() {
+               return this.normalizableKeyPrefixLen;
+       }
+
+       @Override
+       public boolean isNormalizedKeyPrefixOnly(int keyBytes) {
+               return this.numLeadingNormalizableKeys < 
this.keyPositions.length ||
+                               this.normalizableKeyPrefixLen == 
Integer.MAX_VALUE ||
+                               this.normalizableKeyPrefixLen > keyBytes;
+       }
+
+       @Override
+       public boolean invertNormalizedKey() {
+               return this.invertNormKey;
+       }
+       
+       
+       @Override
+       public boolean supportsSerializationWithKeyNormalization() {
+               return false;
+       }
+       
+       @Override
+       public void writeWithKeyNormalization(T record, DataOutputView target) 
throws IOException {
+               throw new UnsupportedOperationException();
+       }
+
+       @Override
+       public T readWithKeyDenormalization(T reuse, DataInputView source) 
throws IOException {
+               throw new UnsupportedOperationException();
+       }
+       
+       // 
--------------------------------------------------------------------------------------------
+       
+       protected final void instantiateDeserializationUtils() {
+               this.deserializedFields1 = new Object[this.serializers.length];
+               this.deserializedFields2 = new Object[this.serializers.length];
+               
+               for (int i = 0; i < this.serializers.length; i++) {
+                       this.deserializedFields1[i] = 
this.serializers[i].createInstance();
+                       this.deserializedFields2[i] = 
this.serializers[i].createInstance();
+               }
+       }
+       
+       // 
--------------------------------------------------------------------------------------------
+       
+       /**
+        * A sequence of prime numbers to be used for salting the computed hash 
values.
+        * Based on some empirical evidence, we are using a 32-element 
subsequence of the  
+        * OEIS sequence #A068652 (numbers such that every cyclic permutation 
is a prime).
+        * 
+        * @see <a 
href="http://en.wikipedia.org/wiki/List_of_prime_numbers";>http://en.wikipedia.org/wiki/List_of_prime_numbers</a>
+        * @see <a href="http://oeis.org/A068652";>http://oeis.org/A068652</a>
+        */
+       public static final int[] HASH_SALT = new int[] {
+               73   , 79   , 97   , 113  , 131  , 197  , 199  , 311   , 
+               337  , 373  , 719  , 733  , 919  , 971  , 991  , 1193  , 
+               1931 , 3119 , 3779 , 7793 , 7937 , 9311 , 9377 , 11939 , 
+               19391, 19937, 37199, 39119, 71993, 91193, 93719, 93911 };
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializer.java
 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializer.java
new file mode 100644
index 0000000..0897063
--- /dev/null
+++ 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializer.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime;
+
+import java.io.IOException;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.types.NullFieldException;
+
+
+public class TupleSerializer<T extends Tuple> extends TupleSerializerBase<T> {
+
+       private static final long serialVersionUID = 1L;
+       
+       public TupleSerializer(Class<T> tupleClass, TypeSerializer<?>[] 
fieldSerializers) {
+               super(tupleClass, fieldSerializers);
+       }
+
+       @Override
+       public TupleSerializer<T> duplicate() {
+               boolean stateful = false;
+               TypeSerializer<?>[] duplicateFieldSerializers = new 
TypeSerializer<?>[fieldSerializers.length];
+
+               for (int i = 0; i < fieldSerializers.length; i++) {
+                       duplicateFieldSerializers[i] = 
fieldSerializers[i].duplicate();
+                       if (duplicateFieldSerializers[i] != 
fieldSerializers[i]) {
+                               // at least one of them is stateful
+                               stateful = true;
+                       }
+               }
+
+               if (stateful) {
+                       return new TupleSerializer<T>(tupleClass, 
duplicateFieldSerializers);
+               } else {
+                       return this;
+               }
+       }
+
+       @Override
+       public T createInstance() {
+               try {
+                       T t = tupleClass.newInstance();
+               
+                       for (int i = 0; i < arity; i++) {
+                               
t.setField(fieldSerializers[i].createInstance(), i);
+                       }
+                       
+                       return t;
+               }
+               catch (Exception e) {
+                       throw new RuntimeException("Cannot instantiate tuple.", 
e);
+               }
+       }
+
+       @Override
+       public T createInstance(Object[] fields) {
+
+               try {
+                       T t = tupleClass.newInstance();
+
+                       for (int i = 0; i < arity; i++) {
+                               t.setField(fields[i], i);
+                       }
+
+                       return t;
+               }
+               catch (Exception e) {
+                       throw new RuntimeException("Cannot instantiate tuple.", 
e);
+               }
+       }
+
+       @Override
+       public T createOrReuseInstance(Object[] fields, T reuse) {
+               for (int i = 0; i < arity; i++) {
+                       reuse.setField(fields[i], i);
+               }
+               return reuse;
+       }
+
+       @Override
+       public T copy(T from) {
+               T target = instantiateRaw();
+               for (int i = 0; i < arity; i++) {
+                       Object copy = 
fieldSerializers[i].copy(from.getField(i));
+                       target.setField(copy, i);
+               }
+               return target;
+       }
+       
+       @Override
+       public T copy(T from, T reuse) {
+               for (int i = 0; i < arity; i++) {
+                       Object copy = 
fieldSerializers[i].copy(from.getField(i), reuse.getField(i));
+                       reuse.setField(copy, i);
+               }
+               
+               return reuse;
+       }
+
+       @Override
+       public void serialize(T value, DataOutputView target) throws 
IOException {
+               for (int i = 0; i < arity; i++) {
+                       Object o = value.getField(i);
+                       try {
+                               fieldSerializers[i].serialize(o, target);
+                       } catch (NullPointerException npex) {
+                               throw new NullFieldException(i, npex);
+                       }
+               }
+       }
+
+       @Override
+       public T deserialize(DataInputView source) throws IOException {
+               T tuple = instantiateRaw();
+               for (int i = 0; i < arity; i++) {
+                       Object field = fieldSerializers[i].deserialize(source);
+                       tuple.setField(field, i);
+               }
+               return tuple;
+       }
+       
+       @Override
+       public T deserialize(T reuse, DataInputView source) throws IOException {
+               for (int i = 0; i < arity; i++) {
+                       Object field = 
fieldSerializers[i].deserialize(reuse.getField(i), source);
+                       reuse.setField(field, i);
+               }
+               return reuse;
+       }
+       
+       private T instantiateRaw() {
+               try {
+                       return tupleClass.newInstance();
+               }
+               catch (Exception e) {
+                       throw new RuntimeException("Cannot instantiate tuple.", 
e);
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerBase.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerBase.java
 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerBase.java
new file mode 100644
index 0000000..fc657a1
--- /dev/null
+++ 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerBase.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime;
+
+import com.google.common.base.Preconditions;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Objects;
+
+
+public abstract class TupleSerializerBase<T> extends TypeSerializer<T> {
+
+       private static final long serialVersionUID = 1L;
+
+       protected final Class<T> tupleClass;
+
+       protected final TypeSerializer<Object>[] fieldSerializers;
+
+       protected final int arity;
+
+       @SuppressWarnings("unchecked")
+       public TupleSerializerBase(Class<T> tupleClass, TypeSerializer<?>[] 
fieldSerializers) {
+               this.tupleClass = Preconditions.checkNotNull(tupleClass);
+               this.fieldSerializers = (TypeSerializer<Object>[]) 
Preconditions.checkNotNull(fieldSerializers);
+               this.arity = fieldSerializers.length;
+       }
+       
+       public Class<T> getTupleClass() {
+               return this.tupleClass;
+       }
+       
+       @Override
+       public boolean isImmutableType() {
+               return false;
+       }
+
+       @Override
+       public int getLength() {
+               return -1;
+       }
+
+       public int getArity() {
+               return arity;
+       }
+
+       // We use this in the Aggregate and Distinct Operators to create 
instances
+       // of immutable Typles (i.e. Scala Tuples)
+       public abstract T createInstance(Object[] fields);
+
+       public abstract T createOrReuseInstance(Object[] fields, T reuse);
+
+       @Override
+       public void copy(DataInputView source, DataOutputView target) throws 
IOException {
+               for (int i = 0; i < arity; i++) {
+                       fieldSerializers[i].copy(source, target);
+               }
+       }
+       
+       @Override
+       public int hashCode() {
+               return 31 * Arrays.hashCode(fieldSerializers) + 
Objects.hash(tupleClass, arity);
+       }
+       
+       @Override
+       public boolean equals(Object obj) {
+               if (obj instanceof TupleSerializerBase) {
+                       TupleSerializerBase<?> other = (TupleSerializerBase<?>) 
obj;
+
+                       return other.canEqual(this) &&
+                               tupleClass == other.tupleClass &&
+                               Arrays.equals(fieldSerializers, 
other.fieldSerializers) &&
+                               arity == other.arity;
+               } else {
+                       return false;
+               }
+       }
+
+       @Override
+       public boolean canEqual(Object obj) {
+               return obj instanceof TupleSerializerBase;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/ValueComparator.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/ValueComparator.java
 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/ValueComparator.java
new file mode 100644
index 0000000..4b9629a
--- /dev/null
+++ 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/ValueComparator.java
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime;
+
+import java.io.IOException;
+
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.types.NormalizableKey;
+import org.apache.flink.types.Value;
+import org.apache.flink.util.InstantiationUtil;
+
+import com.esotericsoftware.kryo.Kryo;
+import org.objenesis.strategy.StdInstantiatorStrategy;
+
+/**
+ * Comparator for all Value types that extend Key
+ */
+public class ValueComparator<T extends Value & Comparable<T>> extends 
TypeComparator<T> {
+       
+       private static final long serialVersionUID = 1L;
+       
+       private final Class<T> type;
+       
+       private final boolean ascendingComparison;
+       
+       private transient T reference;
+       
+       private transient T tempReference;
+       
+       private transient Kryo kryo;
+
+       @SuppressWarnings("rawtypes")
+       private final TypeComparator[] comparators = new TypeComparator[] 
{this};
+
+       public ValueComparator(boolean ascending, Class<T> type) {
+               this.type = type;
+               this.ascendingComparison = ascending;
+       }
+       
+       @Override
+       public int hash(T record) {
+               return record.hashCode();
+       }
+
+       @Override
+       public void setReference(T toCompare) {
+               checkKryoInitialized();
+
+               reference = KryoUtils.copy(toCompare, kryo, new 
ValueSerializer<T>(type));
+       }
+
+       @Override
+       public boolean equalToReference(T candidate) {
+               return candidate.equals(this.reference);
+       }
+
+       @Override
+       public int compareToReference(TypeComparator<T> referencedComparator) {
+               T otherRef = ((ValueComparator<T>) 
referencedComparator).reference;
+               int comp = otherRef.compareTo(reference);
+               return ascendingComparison ? comp : -comp;
+       }
+       
+       @Override
+       public int compare(T first, T second) {
+               int comp = first.compareTo(second);
+               return ascendingComparison ? comp : -comp;
+       }
+       
+       @Override
+       public int compareSerialized(DataInputView firstSource, DataInputView 
secondSource) throws IOException {
+               if (reference == null) {
+                       reference = InstantiationUtil.instantiate(type, 
Value.class);
+               }
+               if (tempReference == null) {
+                       tempReference = InstantiationUtil.instantiate(type, 
Value.class);
+               }
+               
+               reference.read(firstSource);
+               tempReference.read(secondSource);
+               int comp = reference.compareTo(tempReference);
+               return ascendingComparison ? comp : -comp;
+       }
+
+       @Override
+       public boolean supportsNormalizedKey() {
+               return NormalizableKey.class.isAssignableFrom(type);
+       }
+
+       @Override
+       public int getNormalizeKeyLen() {
+               if (reference == null) {
+                       reference = InstantiationUtil.instantiate(type, 
Value.class);
+               }
+               
+               NormalizableKey<?> key = (NormalizableKey<?>) reference;
+               return key.getMaxNormalizedKeyLen();
+       }
+
+       @Override
+       public boolean isNormalizedKeyPrefixOnly(int keyBytes) {
+               return keyBytes < getNormalizeKeyLen();
+       }
+
+       @Override
+       public void putNormalizedKey(T record, MemorySegment target, int 
offset, int numBytes) {
+               NormalizableKey<?> key = (NormalizableKey<?>) record;
+               key.copyNormalizedKey(target, offset, numBytes);
+       }
+
+       @Override
+       public boolean invertNormalizedKey() {
+               return !ascendingComparison;
+       }
+       
+       @Override
+       public TypeComparator<T> duplicate() {
+               return new ValueComparator<T>(ascendingComparison, type);
+       }
+       
+       private void checkKryoInitialized() {
+               if (this.kryo == null) {
+                       this.kryo = new Kryo();
+
+                       Kryo.DefaultInstantiatorStrategy instantiatorStrategy = 
new Kryo.DefaultInstantiatorStrategy();
+                       
instantiatorStrategy.setFallbackInstantiatorStrategy(new 
StdInstantiatorStrategy());
+                       kryo.setInstantiatorStrategy(instantiatorStrategy);
+
+                       this.kryo.setAsmEnabled(true);
+                       this.kryo.register(type);
+               }
+       }
+
+       @Override
+       public int extractKeys(Object record, Object[] target, int index) {
+               target[index] = record;
+               return 1;
+       }
+
+       @SuppressWarnings("rawtypes")
+       @Override
+       public TypeComparator[] getFlatComparators() {
+               return comparators;
+       }
+       
+       // 
--------------------------------------------------------------------------------------------
+       // unsupported normalization
+       // 
--------------------------------------------------------------------------------------------
+       
+       @Override
+       public boolean supportsSerializationWithKeyNormalization() {
+               return false;
+       }
+
+       @Override
+       public void writeWithKeyNormalization(T record, DataOutputView target) 
throws IOException {
+               throw new UnsupportedOperationException();
+       }
+
+       @Override
+       public T readWithKeyDenormalization(T reuse, DataInputView source) 
throws IOException {
+               throw new UnsupportedOperationException();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/ValueSerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/ValueSerializer.java
 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/ValueSerializer.java
new file mode 100644
index 0000000..9329866
--- /dev/null
+++ 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/ValueSerializer.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime;
+
+import java.io.IOException;
+
+import com.google.common.base.Preconditions;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.types.Value;
+import org.apache.flink.util.InstantiationUtil;
+
+import com.esotericsoftware.kryo.Kryo;
+import org.objenesis.strategy.StdInstantiatorStrategy;
+
+/**
+ * Serializer for {@link Value} types. Uses the value's serialization methods, 
and uses
+ * Kryo for deep object copies.
+ *
+ * @param <T> The type serialized.
+ */
+public class ValueSerializer<T extends Value> extends TypeSerializer<T> {
+
+       private static final long serialVersionUID = 1L;
+       
+       private final Class<T> type;
+       
+       private transient Kryo kryo;
+       
+       private transient T copyInstance;
+       
+       // 
--------------------------------------------------------------------------------------------
+       
+       public ValueSerializer(Class<T> type) {
+               this.type = Preconditions.checkNotNull(type);
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+       
+       @Override
+       public boolean isImmutableType() {
+               return false;
+       }
+
+       @Override
+       public ValueSerializer<T> duplicate() {
+               return new ValueSerializer<T>(type);
+       }
+       
+       @Override
+       public T createInstance() {
+               return InstantiationUtil.instantiate(this.type);
+       }
+
+       @Override
+       public T copy(T from) {
+               checkKryoInitialized();
+
+               return KryoUtils.copy(from, kryo, this);
+       }
+       
+       @Override
+       public T copy(T from, T reuse) {
+               checkKryoInitialized();
+
+               return KryoUtils.copy(from, reuse, kryo, this);
+       }
+
+       @Override
+       public int getLength() {
+               return -1;
+       }
+
+       @Override
+       public void serialize(T value, DataOutputView target) throws 
IOException {
+               value.write(target);
+       }
+
+       @Override
+       public T deserialize(DataInputView source) throws IOException {
+               return deserialize(createInstance(), source);
+       }
+       
+       @Override
+       public T deserialize(T reuse, DataInputView source) throws IOException {
+               reuse.read(source);
+               return reuse;
+       }
+
+       @Override
+       public void copy(DataInputView source, DataOutputView target) throws 
IOException {
+               if (this.copyInstance == null) {
+                       this.copyInstance = InstantiationUtil.instantiate(type);
+               }
+               
+               this.copyInstance.read(source);
+               this.copyInstance.write(target);
+       }
+       
+       private void checkKryoInitialized() {
+               if (this.kryo == null) {
+                       this.kryo = new Kryo();
+
+                       Kryo.DefaultInstantiatorStrategy instantiatorStrategy = 
new Kryo.DefaultInstantiatorStrategy();
+                       
instantiatorStrategy.setFallbackInstantiatorStrategy(new 
StdInstantiatorStrategy());
+                       kryo.setInstantiatorStrategy(instantiatorStrategy);
+
+                       this.kryo.setAsmEnabled(true);
+                       this.kryo.register(type);
+               }
+       }
+       
+       // 
--------------------------------------------------------------------------------------------
+       
+       @Override
+       public int hashCode() {
+               return this.type.hashCode();
+       }
+       
+       @Override
+       public boolean equals(Object obj) {
+               if (obj instanceof ValueSerializer) {
+                       ValueSerializer<?> other = (ValueSerializer<?>) obj;
+
+                       return other.canEqual(this) && type == other.type;
+               } else {
+                       return false;
+               }
+       }
+
+       @Override
+       public boolean canEqual(Object obj) {
+               return obj instanceof ValueSerializer;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableComparator.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableComparator.java
 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableComparator.java
new file mode 100644
index 0000000..a03369a
--- /dev/null
+++ 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableComparator.java
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime;
+
+import java.io.IOException;
+
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.types.NormalizableKey;
+import org.apache.flink.util.InstantiationUtil;
+import org.apache.hadoop.io.Writable;
+
+import com.esotericsoftware.kryo.Kryo;
+import org.objenesis.strategy.StdInstantiatorStrategy;
+
+public class WritableComparator<T extends Writable & Comparable<T>> extends 
TypeComparator<T> {
+       
+       private static final long serialVersionUID = 1L;
+       
+       private Class<T> type;
+       
+       private final boolean ascendingComparison;
+       
+       private transient T reference;
+       
+       private transient T tempReference;
+       
+       private transient Kryo kryo;
+
+       @SuppressWarnings("rawtypes")
+       private final TypeComparator[] comparators = new TypeComparator[] 
{this};
+
+       public WritableComparator(boolean ascending, Class<T> type) {
+               this.type = type;
+               this.ascendingComparison = ascending;
+       }
+       
+       @Override
+       public int hash(T record) {
+               return record.hashCode();
+       }
+       
+       @Override
+       public void setReference(T toCompare) {
+               checkKryoInitialized();
+
+               reference = KryoUtils.copy(toCompare, kryo, new 
WritableSerializer<T>(type));
+       }
+       
+       @Override
+       public boolean equalToReference(T candidate) {
+               return candidate.equals(reference);
+       }
+       
+       @Override
+       public int compareToReference(TypeComparator<T> referencedComparator) {
+               T otherRef = ((WritableComparator<T>) 
referencedComparator).reference;
+               int comp = otherRef.compareTo(reference);
+               return ascendingComparison ? comp : -comp;
+       }
+       
+       @Override
+       public int compare(T first, T second) {
+               int comp = first.compareTo(second);
+               return ascendingComparison ? comp : -comp;
+       }
+       
+       @Override
+       public int compareSerialized(DataInputView firstSource, DataInputView 
secondSource) throws IOException {
+               ensureReferenceInstantiated();
+               ensureTempReferenceInstantiated();
+               
+               reference.readFields(firstSource);
+               tempReference.readFields(secondSource);
+               
+               int comp = reference.compareTo(tempReference);
+               return ascendingComparison ? comp : -comp;
+       }
+       
+       @Override
+       public boolean supportsNormalizedKey() {
+               return NormalizableKey.class.isAssignableFrom(type);
+       }
+       
+       @Override
+       public int getNormalizeKeyLen() {
+               ensureReferenceInstantiated();
+               
+               NormalizableKey<?> key = (NormalizableKey<?>) reference;
+               return key.getMaxNormalizedKeyLen();
+       }
+       
+       @Override
+       public boolean isNormalizedKeyPrefixOnly(int keyBytes) {
+               return keyBytes < getNormalizeKeyLen();
+       }
+       
+       @Override
+       public void putNormalizedKey(T record, MemorySegment target, int 
offset, int numBytes) {
+               NormalizableKey<?> key = (NormalizableKey<?>) record;
+               key.copyNormalizedKey(target, offset, numBytes);
+       }
+       
+       @Override
+       public boolean invertNormalizedKey() {
+               return !ascendingComparison;
+       }
+       
+       @Override
+       public TypeComparator<T> duplicate() {
+               return new WritableComparator<T>(ascendingComparison, type);
+       }
+
+       @Override
+       public int extractKeys(Object record, Object[] target, int index) {
+               target[index] = record;
+               return 1;
+       }
+
+       @SuppressWarnings("rawtypes")
+       @Override
+       public TypeComparator[] getFlatComparators() {
+               return comparators;
+       }
+       
+       // 
--------------------------------------------------------------------------------------------
+       // unsupported normalization
+       // 
--------------------------------------------------------------------------------------------
+       
+       @Override
+       public boolean supportsSerializationWithKeyNormalization() {
+               return false;
+       }
+       
+       @Override
+       public void writeWithKeyNormalization(T record, DataOutputView target) 
throws IOException {
+               throw new UnsupportedOperationException();
+       }
+       
+       @Override
+       public T readWithKeyDenormalization(T reuse, DataInputView source) 
throws IOException {
+               throw new UnsupportedOperationException();
+       }
+       
+       // 
--------------------------------------------------------------------------------------------
+       
+       private void checkKryoInitialized() {
+               if (this.kryo == null) {
+                       this.kryo = new Kryo();
+
+                       Kryo.DefaultInstantiatorStrategy instantiatorStrategy = 
new Kryo.DefaultInstantiatorStrategy();
+                       
instantiatorStrategy.setFallbackInstantiatorStrategy(new 
StdInstantiatorStrategy());
+                       kryo.setInstantiatorStrategy(instantiatorStrategy);
+
+                       this.kryo.setAsmEnabled(true);
+                       this.kryo.register(type);
+               }
+       }
+       
+       private void ensureReferenceInstantiated() {
+               if (reference == null) {
+                       reference = InstantiationUtil.instantiate(type, 
Writable.class);
+               }
+       }
+       
+       private void ensureTempReferenceInstantiated() {
+               if (tempReference == null) {
+                       tempReference = InstantiationUtil.instantiate(type, 
Writable.class);
+               }
+       }
+}

Reply via email to