http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/main/java/org/apache/flink/api/java/typeutils/ValueTypeInfo.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/ValueTypeInfo.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/ValueTypeInfo.java
deleted file mode 100644
index 5187de7..0000000
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/ValueTypeInfo.java
+++ /dev/null
@@ -1,183 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.typeutils;
-
-import com.google.common.base.Preconditions;
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.functions.InvalidTypesException;
-import org.apache.flink.api.common.typeinfo.AtomicType;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeutils.TypeComparator;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.typeutils.runtime.CopyableValueComparator;
-import org.apache.flink.api.java.typeutils.runtime.CopyableValueSerializer;
-import org.apache.flink.api.java.typeutils.runtime.ValueComparator;
-import org.apache.flink.api.java.typeutils.runtime.ValueSerializer;
-import org.apache.flink.types.BooleanValue;
-import org.apache.flink.types.ByteValue;
-import org.apache.flink.types.CharValue;
-import org.apache.flink.types.CopyableValue;
-import org.apache.flink.types.DoubleValue;
-import org.apache.flink.types.FloatValue;
-import org.apache.flink.types.IntValue;
-import org.apache.flink.types.LongValue;
-import org.apache.flink.types.NullValue;
-import org.apache.flink.types.ShortValue;
-import org.apache.flink.types.StringValue;
-import org.apache.flink.types.Value;
-
-/**
- * Type information for data types that extend the {@link Value} interface. 
The value
- * interface allows types to define their custom serialization and 
deserialization routines.
- *
- * @param <T> The type of the class represented by this type information.
- */
-public class ValueTypeInfo<T extends Value> extends TypeInformation<T> 
implements AtomicType<T> {
-
-       private static final long serialVersionUID = 1L;
-
-       public static final ValueTypeInfo<BooleanValue> BOOLEAN_VALUE_TYPE_INFO 
= new ValueTypeInfo<>(BooleanValue.class);
-       public static final ValueTypeInfo<ByteValue> BYTE_VALUE_TYPE_INFO = new 
ValueTypeInfo<>(ByteValue.class);
-       public static final ValueTypeInfo<CharValue> CHAR_VALUE_TYPE_INFO = new 
ValueTypeInfo<>(CharValue.class);
-       public static final ValueTypeInfo<DoubleValue> DOUBLE_VALUE_TYPE_INFO = 
new ValueTypeInfo<>(DoubleValue.class);
-       public static final ValueTypeInfo<FloatValue> FLOAT_VALUE_TYPE_INFO = 
new ValueTypeInfo<>(FloatValue.class);
-       public static final ValueTypeInfo<IntValue> INT_VALUE_TYPE_INFO = new 
ValueTypeInfo<>(IntValue.class);
-       public static final ValueTypeInfo<LongValue> LONG_VALUE_TYPE_INFO = new 
ValueTypeInfo<>(LongValue.class);
-       public static final ValueTypeInfo<NullValue> NULL_VALUE_TYPE_INFO = new 
ValueTypeInfo<>(NullValue.class);
-       public static final ValueTypeInfo<ShortValue> SHORT_VALUE_TYPE_INFO = 
new ValueTypeInfo<>(ShortValue.class);
-       public static final ValueTypeInfo<StringValue> STRING_VALUE_TYPE_INFO = 
new ValueTypeInfo<>(StringValue.class);
-
-       private final Class<T> type;
-       
-       public ValueTypeInfo(Class<T> type) {
-               this.type = Preconditions.checkNotNull(type);
-
-               Preconditions.checkArgument(
-                       Value.class.isAssignableFrom(type) || 
type.equals(Value.class),
-                       "ValueTypeInfo can only be used for subclasses of " + 
Value.class.getName());
-       }
-       
-       @Override
-       public int getArity() {
-               return 1;
-       }
-
-       @Override
-       public int getTotalFields() {
-               return 1;
-       }
-       
-       @Override
-       public Class<T> getTypeClass() {
-               return this.type;
-       }
-
-       @Override
-       public boolean isBasicType() {
-               return false;
-       }
-
-       public boolean isBasicValueType() {
-               return type.equals(StringValue.class) || 
type.equals(ByteValue.class) || type.equals(ShortValue.class) || 
type.equals(CharValue.class) ||
-                               type.equals(DoubleValue.class) || 
type.equals(FloatValue.class) || type.equals(IntValue.class) || 
type.equals(LongValue.class) ||
-                               type.equals(NullValue.class) || 
type.equals(BooleanValue.class);
-       }
-
-       @Override
-       public boolean isTupleType() {
-               return false;
-       }
-       
-       @Override
-       public boolean isKeyType() {
-               return Comparable.class.isAssignableFrom(type);
-       }
-
-       @Override
-       @SuppressWarnings("unchecked")
-       public TypeSerializer<T> createSerializer(ExecutionConfig 
executionConfig) {
-               if (CopyableValue.class.isAssignableFrom(type)) {
-                       return (TypeSerializer<T>) 
createCopyableValueSerializer(type.asSubclass(CopyableValue.class));
-               }
-               else {
-                       return new ValueSerializer<T>(type);
-               }
-       }
-       
-       @SuppressWarnings({ "unchecked", "rawtypes" })
-       @Override
-       public TypeComparator<T> createComparator(boolean sortOrderAscending, 
ExecutionConfig executionConfig) {
-               if (!isKeyType()) {
-                       throw new RuntimeException("The type " + type.getName() 
+ " is not Comparable.");
-               }
-               
-               if (CopyableValue.class.isAssignableFrom(type)) {
-                       return (TypeComparator<T>) new 
CopyableValueComparator(sortOrderAscending, type);
-               }
-               else {
-                       return (TypeComparator<T>) new 
ValueComparator(sortOrderAscending, type);
-               }
-       }
-       
-       // utility method to summon the necessary bound
-       private static <X extends CopyableValue<X>> CopyableValueSerializer<X> 
createCopyableValueSerializer(Class<X> clazz) {
-               return new CopyableValueSerializer<X>(clazz);
-       }
-       
-       // 
--------------------------------------------------------------------------------------------
-       
-       @Override
-       public int hashCode() {
-               return this.type.hashCode();
-       }
-       
-       @Override
-       public boolean equals(Object obj) {
-               if (obj instanceof ValueTypeInfo) {
-                       @SuppressWarnings("unchecked")
-                       ValueTypeInfo<T> valueTypeInfo = (ValueTypeInfo<T>) obj;
-
-                       return valueTypeInfo.canEqual(this) &&
-                               type == valueTypeInfo.type;
-               } else {
-                       return false;
-               }
-       }
-
-       @Override
-       public boolean canEqual(Object obj) {
-               return obj instanceof ValueTypeInfo;
-       }
-       
-       @Override
-       public String toString() {
-               return "ValueType<" + type.getSimpleName() + ">";
-       }
-       
-       // 
--------------------------------------------------------------------------------------------
-       
-       static <X extends Value> TypeInformation<X> getValueTypeInfo(Class<X> 
typeClass) {
-               if (Value.class.isAssignableFrom(typeClass) && 
!typeClass.equals(Value.class)) {
-                       return new ValueTypeInfo<X>(typeClass);
-               }
-               else {
-                       throw new InvalidTypesException("The given class is no 
subclass of " + Value.class.getName());
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/main/java/org/apache/flink/api/java/typeutils/WritableTypeInfo.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/WritableTypeInfo.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/WritableTypeInfo.java
deleted file mode 100644
index 6c140d9..0000000
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/WritableTypeInfo.java
+++ /dev/null
@@ -1,139 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.typeutils;
-
-import com.google.common.base.Preconditions;
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.functions.InvalidTypesException;
-import org.apache.flink.api.common.typeinfo.AtomicType;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeutils.TypeComparator;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.typeutils.runtime.WritableComparator;
-import org.apache.flink.api.java.typeutils.runtime.WritableSerializer;
-import org.apache.hadoop.io.Writable;
-
-/**
- * Type information for data types that extend Hadoop's {@link Writable} 
interface. The Writable
- * interface defines the serialization and deserialization routines for the 
data type.
- *
- * @param <T> The type of the class represented by this type information.
- */
-public class WritableTypeInfo<T extends Writable> extends TypeInformation<T> 
implements AtomicType<T> {
-       
-       private static final long serialVersionUID = 1L;
-       
-       private final Class<T> typeClass;
-       
-       public WritableTypeInfo(Class<T> typeClass) {
-               this.typeClass = Preconditions.checkNotNull(typeClass);
-
-               Preconditions.checkArgument(
-                       Writable.class.isAssignableFrom(typeClass) && 
!typeClass.equals(Writable.class),
-                       "WritableTypeInfo can only be used for subclasses of " 
+ Writable.class.getName());
-       }
-
-       @SuppressWarnings({ "rawtypes", "unchecked" })
-       @Override
-       public TypeComparator<T> createComparator(boolean sortOrderAscending, 
ExecutionConfig executionConfig) {
-               if(Comparable.class.isAssignableFrom(typeClass)) {
-                       return new WritableComparator(sortOrderAscending, 
typeClass);
-               }
-               else {
-                       throw new UnsupportedOperationException("Cannot create 
Comparator for "+typeClass.getCanonicalName()+". " +
-                                                                               
                        "Class does not implement Comparable interface.");
-               }
-       }
-
-       @Override
-       public boolean isBasicType() {
-               return false;
-       }
-
-       @Override
-       public boolean isTupleType() {
-               return false;
-       }
-
-       @Override
-       public int getArity() {
-               return 1;
-       }
-       
-       @Override
-       public int getTotalFields() {
-               return 1;
-       }
-
-       @Override
-       public Class<T> getTypeClass() {
-               return this.typeClass;
-       }
-
-       @Override
-       public boolean isKeyType() {
-               return Comparable.class.isAssignableFrom(typeClass);
-       }
-
-       @Override
-       public TypeSerializer<T> createSerializer(ExecutionConfig 
executionConfig) {
-               return new WritableSerializer<T>(typeClass);
-       }
-       
-       @Override
-       public String toString() {
-               return "WritableType<" + typeClass.getName() + ">";
-       }       
-       
-       @Override
-       public int hashCode() {
-               return typeClass.hashCode();
-       }
-       
-       @Override
-       public boolean equals(Object obj) {
-               if (obj instanceof WritableTypeInfo) {
-                       @SuppressWarnings("unchecked")
-                       WritableTypeInfo<T> writableTypeInfo = 
(WritableTypeInfo<T>) obj;
-
-                       return writableTypeInfo.canEqual(this) &&
-                               typeClass == writableTypeInfo.typeClass;
-
-               } else {
-                       return false;
-               }
-       }
-
-       @Override
-       public boolean canEqual(Object obj) {
-               return obj instanceof WritableTypeInfo;
-       }
-       
-       // 
--------------------------------------------------------------------------------------------
-       
-       static <T extends Writable> TypeInformation<T> 
getWritableTypeInfo(Class<T> typeClass) {
-               if (Writable.class.isAssignableFrom(typeClass) && 
!typeClass.equals(Writable.class)) {
-                       return new WritableTypeInfo<T>(typeClass);
-               }
-               else {
-                       throw new InvalidTypesException("The given class is no 
subclass of " + Writable.class.getName());
-               }
-       }
-       
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/AvroSerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/AvroSerializer.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/AvroSerializer.java
deleted file mode 100644
index bc04367..0000000
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/AvroSerializer.java
+++ /dev/null
@@ -1,201 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.typeutils.runtime;
-
-import java.io.IOException;
-
-import com.google.common.base.Preconditions;
-import org.apache.avro.generic.GenericData;
-import org.apache.avro.reflect.ReflectDatumReader;
-import org.apache.avro.reflect.ReflectDatumWriter;
-import org.apache.avro.util.Utf8;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.typeutils.runtime.kryo.Serializers;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.util.InstantiationUtil;
-
-import com.esotericsoftware.kryo.Kryo;
-import org.objenesis.strategy.StdInstantiatorStrategy;
-
-
-/**
- * General purpose serialization. Currently using Apache Avro's 
Reflect-serializers for serialization and
- * Kryo for deep object copies. We want to change this to Kryo-only.
- *
- * @param <T> The type serialized.
- */
-public final class AvroSerializer<T> extends TypeSerializer<T> {
-
-       private static final long serialVersionUID = 1L;
-       
-       private final Class<T> type;
-       
-       private final Class<? extends T> typeToInstantiate;
-       
-       private transient ReflectDatumWriter<T> writer;
-       private transient ReflectDatumReader<T> reader;
-       
-       private transient DataOutputEncoder encoder;
-       private transient DataInputDecoder decoder;
-       
-       private transient Kryo kryo;
-       
-       private transient T deepCopyInstance;
-       
-       // 
--------------------------------------------------------------------------------------------
-       
-       public AvroSerializer(Class<T> type) {
-               this(type, type);
-       }
-       
-       public AvroSerializer(Class<T> type, Class<? extends T> 
typeToInstantiate) {
-               this.type = Preconditions.checkNotNull(type);
-               this.typeToInstantiate = 
Preconditions.checkNotNull(typeToInstantiate);
-               
-               InstantiationUtil.checkForInstantiation(typeToInstantiate);
-       }
-
-       // 
--------------------------------------------------------------------------------------------
-       
-       @Override
-       public boolean isImmutableType() {
-               return false;
-       }
-
-       @Override
-       public AvroSerializer<T> duplicate() {
-               return new AvroSerializer<T>(type, typeToInstantiate);
-       }
-       
-       @Override
-       public T createInstance() {
-               return InstantiationUtil.instantiate(this.typeToInstantiate);
-       }
-
-       @Override
-       public T copy(T from) {
-               checkKryoInitialized();
-
-               return KryoUtils.copy(from, kryo, this);
-       }
-       
-       @Override
-       public T copy(T from, T reuse) {
-               checkKryoInitialized();
-
-               return KryoUtils.copy(from, reuse, kryo, this);
-       }
-
-       @Override
-       public int getLength() {
-               return -1;
-       }
-
-       @Override
-       public void serialize(T value, DataOutputView target) throws 
IOException {
-               checkAvroInitialized();
-               this.encoder.setOut(target);
-               this.writer.write(value, this.encoder);
-       }
-       
-       @Override
-       public T deserialize(DataInputView source) throws IOException {
-               checkAvroInitialized();
-               this.decoder.setIn(source);
-               return this.reader.read(null, this.decoder);
-       }
-
-       @Override
-       public T deserialize(T reuse, DataInputView source) throws IOException {
-               checkAvroInitialized();
-               this.decoder.setIn(source);
-               return this.reader.read(reuse, this.decoder);
-       }
-
-       @Override
-       public void copy(DataInputView source, DataOutputView target) throws 
IOException {
-               checkAvroInitialized();
-               
-               if (this.deepCopyInstance == null) {
-                       this.deepCopyInstance = 
InstantiationUtil.instantiate(type, Object.class);
-               }
-               
-               this.decoder.setIn(source);
-               this.encoder.setOut(target);
-               
-               T tmp = this.reader.read(this.deepCopyInstance, this.decoder);
-               this.writer.write(tmp, this.encoder);
-       }
-       
-       
-       private void checkAvroInitialized() {
-               if (this.reader == null) {
-                       this.reader = new ReflectDatumReader<T>(type);
-                       this.writer = new ReflectDatumWriter<T>(type);
-                       this.encoder = new DataOutputEncoder();
-                       this.decoder = new DataInputDecoder();
-               }
-       }
-       
-       private void checkKryoInitialized() {
-               if (this.kryo == null) {
-                       this.kryo = new Kryo();
-
-                       Kryo.DefaultInstantiatorStrategy instantiatorStrategy = 
new Kryo.DefaultInstantiatorStrategy();
-                       
instantiatorStrategy.setFallbackInstantiatorStrategy(new 
StdInstantiatorStrategy());
-                       kryo.setInstantiatorStrategy(instantiatorStrategy);
-
-                       // register Avro types.
-                       this.kryo.register(GenericData.Array.class, new 
Serializers.SpecificInstanceCollectionSerializerForArrayList());
-                       this.kryo.register(Utf8.class);
-                       this.kryo.register(GenericData.EnumSymbol.class);
-                       this.kryo.register(GenericData.Fixed.class);
-                       this.kryo.register(GenericData.StringType.class);
-                       this.kryo.setAsmEnabled(true);
-                       this.kryo.register(type);
-               }
-       }
-       
-       // 
--------------------------------------------------------------------------------------------
-       
-       @Override
-       public int hashCode() {
-               return 31 * this.type.hashCode() + 
this.typeToInstantiate.hashCode();
-       }
-       
-       @Override
-       public boolean equals(Object obj) {
-               if (obj instanceof AvroSerializer) {
-                       @SuppressWarnings("unchecked")
-                       AvroSerializer<T> avroSerializer = (AvroSerializer<T>) 
obj;
-
-                       return avroSerializer.canEqual(this) &&
-                               type == avroSerializer.type &&
-                               typeToInstantiate == 
avroSerializer.typeToInstantiate;
-               } else {
-                       return false;
-               }
-       }
-
-       @Override
-       public boolean canEqual(Object obj) {
-               return obj instanceof AvroSerializer;
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/CopyableValueComparator.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/CopyableValueComparator.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/CopyableValueComparator.java
deleted file mode 100644
index 9b3b191..0000000
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/CopyableValueComparator.java
+++ /dev/null
@@ -1,167 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.typeutils.runtime;
-
-import java.io.IOException;
-
-import org.apache.flink.api.common.typeutils.TypeComparator;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.core.memory.MemorySegment;
-import org.apache.flink.types.CopyableValue;
-import org.apache.flink.types.NormalizableKey;
-import org.apache.flink.util.InstantiationUtil;
-
-/**
- * Comparator for all Value types that extend Key
- */
-public class CopyableValueComparator<T extends CopyableValue<T> & 
Comparable<T>> extends TypeComparator<T> {
-       
-       private static final long serialVersionUID = 1L;
-       
-       private final Class<T> type;
-       
-       private final boolean ascendingComparison;
-       
-       private transient T reference;
-       
-       private transient T tempReference;
-
-       private final TypeComparator<?>[] comparators = new TypeComparator[] 
{this};
-
-       public CopyableValueComparator(boolean ascending, Class<T> type) {
-               this.type = type;
-               this.ascendingComparison = ascending;
-               this.reference = InstantiationUtil.instantiate(type, 
CopyableValue.class);
-       }
-       
-       @Override
-       public int hash(T record) {
-               return record.hashCode();
-       }
-
-       @Override
-       public void setReference(T toCompare) {
-               toCompare.copyTo(reference);
-       }
-
-       @Override
-       public boolean equalToReference(T candidate) {
-               return candidate.equals(this.reference);
-       }
-
-       @Override
-       public int compareToReference(TypeComparator<T> referencedComparator) {
-               T otherRef = ((CopyableValueComparator<T>) 
referencedComparator).reference;
-               int comp = otherRef.compareTo(reference);
-               return ascendingComparison ? comp : -comp;
-       }
-       
-       @Override
-       public int compare(T first, T second) {
-               int comp = first.compareTo(second);
-               return ascendingComparison ? comp : -comp;
-       }
-       
-       @Override
-       public int compareSerialized(DataInputView firstSource, DataInputView 
secondSource) throws IOException {
-               if (tempReference == null) {
-                       tempReference = InstantiationUtil.instantiate(type, 
CopyableValue.class);
-               }
-               
-               reference.read(firstSource);
-               tempReference.read(secondSource);
-               int comp = reference.compareTo(tempReference);
-               return ascendingComparison ? comp : -comp;
-       }
-
-       @Override
-       public boolean supportsNormalizedKey() {
-               return NormalizableKey.class.isAssignableFrom(type);
-       }
-
-       @Override
-       public int getNormalizeKeyLen() {
-               NormalizableKey<?> key = (NormalizableKey<?>) reference;
-               return key.getMaxNormalizedKeyLen();
-       }
-
-       @Override
-       public boolean isNormalizedKeyPrefixOnly(int keyBytes) {
-               return keyBytes < getNormalizeKeyLen();
-       }
-
-       @Override
-       public void putNormalizedKey(T record, MemorySegment target, int 
offset, int numBytes) {
-               NormalizableKey<?> key = (NormalizableKey<?>) record;
-               key.copyNormalizedKey(target, offset, numBytes);
-       }
-
-       @Override
-       public boolean invertNormalizedKey() {
-               return !ascendingComparison;
-       }
-       
-       @Override
-       public TypeComparator<T> duplicate() {
-               return new CopyableValueComparator<T>(ascendingComparison, 
type);
-       }
-
-       @Override
-       public int extractKeys(Object record, Object[] target, int index) {
-               target[index] = record;
-               return 1;
-       }
-
-       @Override
-       public TypeComparator<?>[] getFlatComparators() {
-               return comparators;
-       }
-       
-       // 
--------------------------------------------------------------------------------------------
-       // unsupported normalization
-       // 
--------------------------------------------------------------------------------------------
-       
-       @Override
-       public boolean supportsSerializationWithKeyNormalization() {
-               return false;
-       }
-
-       @Override
-       public void writeWithKeyNormalization(T record, DataOutputView target) 
throws IOException {
-               throw new UnsupportedOperationException();
-       }
-
-       @Override
-       public T readWithKeyDenormalization(T reuse, DataInputView source) 
throws IOException {
-               throw new UnsupportedOperationException();
-       }       
-       
-       // 
--------------------------------------------------------------------------------------------
-       // serialization
-       // 
--------------------------------------------------------------------------------------------
-       
-       private void readObject(java.io.ObjectInputStream s) throws 
java.io.IOException, ClassNotFoundException {
-               // read basic object and the type
-               s.defaultReadObject();
-               
-               this.reference = InstantiationUtil.instantiate(type, 
CopyableValue.class);
-               this.tempReference = null;
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/CopyableValueSerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/CopyableValueSerializer.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/CopyableValueSerializer.java
deleted file mode 100644
index 9e46f27..0000000
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/CopyableValueSerializer.java
+++ /dev/null
@@ -1,129 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.typeutils.runtime;
-
-import java.io.IOException;
-
-import com.google.common.base.Preconditions;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.types.CopyableValue;
-import org.apache.flink.util.InstantiationUtil;
-
-
-public class CopyableValueSerializer<T extends CopyableValue<T>> extends 
TypeSerializer<T> {
-
-       private static final long serialVersionUID = 1L;
-       
-       
-       private final Class<T> valueClass;
-       
-       private transient T instance;
-       
-       
-       public CopyableValueSerializer(Class<T> valueClass) {
-               this.valueClass = Preconditions.checkNotNull(valueClass);
-       }
-
-       @Override
-       public boolean isImmutableType() {
-               return false;
-       }
-
-       @Override
-       public CopyableValueSerializer<T> duplicate() {
-               return this;
-       }
-
-       @Override
-       public T createInstance() {
-               return InstantiationUtil.instantiate(this.valueClass);
-       }
-       
-       @Override
-       public T copy(T from) {
-               return copy(from, createInstance());
-       }
-       
-       @Override
-       public T copy(T from, T reuse) {
-               from.copyTo(reuse);
-               return reuse;
-       }
-
-       @Override
-       public int getLength() {
-               ensureInstanceInstantiated();
-               return instance.getBinaryLength();
-       }
-
-       @Override
-       public void serialize(T value, DataOutputView target) throws 
IOException {
-               value.write(target);
-       }
-
-       @Override
-       public T deserialize(DataInputView source) throws IOException {
-               return deserialize(createInstance(), source);
-       }
-       
-       @Override
-       public T deserialize(T reuse, DataInputView source) throws IOException {
-               reuse.read(source);
-               return reuse;
-       }
-
-       @Override
-       public void copy(DataInputView source, DataOutputView target) throws 
IOException {
-               ensureInstanceInstantiated();
-               instance.copy(source, target);
-       }
-       
-       // 
--------------------------------------------------------------------------------------------
-       
-       private void ensureInstanceInstantiated() {
-               if (instance == null) {
-                       instance = createInstance();
-               }
-       }
-       
-       @Override
-       public int hashCode() {
-               return this.valueClass.hashCode();
-       }
-       
-       @Override
-       public boolean equals(Object obj) {
-               if (obj instanceof CopyableValueSerializer) {
-                       @SuppressWarnings("unchecked")
-                       CopyableValueSerializer<T> copyableValueSerializer = 
(CopyableValueSerializer<T>) obj;
-
-                       return copyableValueSerializer.canEqual(this) &&
-                               valueClass == 
copyableValueSerializer.valueClass;
-               } else {
-                       return false;
-               }
-       }
-
-       @Override
-       public boolean canEqual(Object obj) {
-               return obj instanceof CopyableValueSerializer;
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/DataInputDecoder.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/DataInputDecoder.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/DataInputDecoder.java
deleted file mode 100644
index e48f9fa..0000000
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/DataInputDecoder.java
+++ /dev/null
@@ -1,229 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.typeutils.runtime;
-
-import java.io.DataInput;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-
-import org.apache.avro.io.Decoder;
-import org.apache.avro.util.Utf8;
-
-
-public class DataInputDecoder extends Decoder implements java.io.Serializable {
-       
-       private static final long serialVersionUID = 1L;
-       
-       private transient Utf8 stringDecoder = new Utf8();
-       
-       
-       private transient DataInput in;
-       
-       
-       public void setIn(DataInput in) {
-               this.in = in;
-       }
-
-       // 
--------------------------------------------------------------------------------------------
-       // primitives
-       // 
--------------------------------------------------------------------------------------------
-       
-       @Override
-       public void readNull() {}
-       
-
-       @Override
-       public boolean readBoolean() throws IOException {
-               return in.readBoolean();
-       }
-
-       @Override
-       public int readInt() throws IOException {
-               return in.readInt();
-       }
-
-       @Override
-       public long readLong() throws IOException {
-               return in.readLong();
-       }
-
-       @Override
-       public float readFloat() throws IOException {
-               return in.readFloat();
-       }
-
-       @Override
-       public double readDouble() throws IOException {
-               return in.readDouble();
-       }
-       
-       @Override
-       public int readEnum() throws IOException {
-               return readInt();
-       }
-       
-       // 
--------------------------------------------------------------------------------------------
-       // bytes
-       // 
--------------------------------------------------------------------------------------------
-
-       @Override
-       public void readFixed(byte[] bytes, int start, int length) throws 
IOException {
-               in.readFully(bytes, start, length);
-       }
-       
-       @Override
-       public ByteBuffer readBytes(ByteBuffer old) throws IOException {
-               int length = readInt();
-               ByteBuffer result;
-               if (old != null && length <= old.capacity() && old.hasArray()) {
-                       result = old;
-                       result.clear();
-               } else {
-                       result = ByteBuffer.allocate(length);
-               }
-               in.readFully(result.array(), result.arrayOffset() + 
result.position(), length);
-               result.limit(length);
-               return result;
-       }
-       
-       
-       @Override
-       public void skipFixed(int length) throws IOException {
-               skipBytes(length);
-       }
-       
-       @Override
-       public void skipBytes() throws IOException {
-               int num = readInt();
-               skipBytes(num);
-       }
-       
-       // 
--------------------------------------------------------------------------------------------
-       // strings
-       // 
--------------------------------------------------------------------------------------------
-       
-       
-       @Override
-       public Utf8 readString(Utf8 old) throws IOException {
-               int length = readInt();
-               Utf8 result = (old != null ? old : new Utf8());
-               result.setByteLength(length);
-               
-               if (length > 0) {
-                       in.readFully(result.getBytes(), 0, length);
-               }
-               
-               return result;
-       }
-
-       @Override
-       public String readString() throws IOException {
-               return readString(stringDecoder).toString();
-       }
-
-       @Override
-       public void skipString() throws IOException {
-               int len = readInt();
-               skipBytes(len);
-       }
-
-       // 
--------------------------------------------------------------------------------------------
-       // collection types
-       // 
--------------------------------------------------------------------------------------------
-
-       @Override
-       public long readArrayStart() throws IOException {
-               return readVarLongCount(in);
-       }
-
-       @Override
-       public long arrayNext() throws IOException {
-               return readVarLongCount(in);
-       }
-
-       @Override
-       public long skipArray() throws IOException {
-               return readVarLongCount(in);
-       }
-
-       @Override
-       public long readMapStart() throws IOException {
-               return readVarLongCount(in);
-       }
-
-       @Override
-       public long mapNext() throws IOException {
-               return readVarLongCount(in);
-       }
-
-       @Override
-       public long skipMap() throws IOException {
-               return readVarLongCount(in);
-       }
-       
-       // 
--------------------------------------------------------------------------------------------
-       // union
-       // 
--------------------------------------------------------------------------------------------
-
-       @Override
-       public int readIndex() throws IOException {
-               return readInt();
-       }
-       
-       // 
--------------------------------------------------------------------------------------------
-       // utils
-       // 
--------------------------------------------------------------------------------------------
-       
-       private void skipBytes(int num) throws IOException {
-               while (num > 0) {
-                       num -= in.skipBytes(num);
-               }
-       }
-       
-       public static long readVarLongCount(DataInput in) throws IOException {
-               long value = in.readUnsignedByte();
-
-               if ((value & 0x80) == 0) {
-                       return value;
-               }
-               else {
-                       long curr;
-                       int shift = 7;
-                       value = value & 0x7f;
-                       while (((curr = in.readUnsignedByte()) & 0x80) != 0){
-                               value |= (curr & 0x7f) << shift;
-                               shift += 7;
-                       }
-                       value |= curr << shift;
-                       return value;
-               }
-       }
-       
-       // 
--------------------------------------------------------------------------------------------
-       // serialization
-       // 
--------------------------------------------------------------------------------------------
-       
-       private void readObject(java.io.ObjectInputStream s) throws 
java.io.IOException, ClassNotFoundException {
-               // Read in size, and any hidden stuff
-               s.defaultReadObject();
-               
-               this.stringDecoder = new Utf8();
-               this.in = null;
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/DataInputViewStream.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/DataInputViewStream.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/DataInputViewStream.java
deleted file mode 100644
index be17d64..0000000
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/DataInputViewStream.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.typeutils.runtime;
-
-import org.apache.flink.core.memory.DataInputView;
-
-import java.io.EOFException;
-import java.io.IOException;
-import java.io.InputStream;
-
-/**
- * An input stream that draws its data from a {@link DataInputView}.
- */
-public class DataInputViewStream extends InputStream {
-       
-       protected DataInputView inputView;
-
-       public DataInputViewStream(DataInputView inputView) {
-               this.inputView = inputView;
-       }
-
-       public DataInputView getInputView(){
-               return inputView;
-       }
-
-       @Override
-       public int read() throws IOException {
-               try {
-                       return inputView.readUnsignedByte();
-               } catch(EOFException ex) {
-                       return -1;
-               }
-       }
-
-       @Override
-       public long skip(long n) throws IOException {
-               long counter = n;
-               while(counter > Integer.MAX_VALUE) {
-                       int skippedBytes = 
inputView.skipBytes(Integer.MAX_VALUE);
-
-                       if (skippedBytes == 0) {
-                               return n - counter;
-                       }
-
-                       counter -= skippedBytes;
-               }
-               return n - counter - inputView.skipBytes((int) counter);
-       }
-
-       @Override
-       public int read(byte[] b, int off, int len) throws IOException {
-               return inputView.read(b, off, len);
-       }
-}
-

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/DataOutputEncoder.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/DataOutputEncoder.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/DataOutputEncoder.java
deleted file mode 100644
index 5c89962..0000000
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/DataOutputEncoder.java
+++ /dev/null
@@ -1,190 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.typeutils.runtime;
-
-import java.io.DataOutput;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-
-import org.apache.avro.io.Encoder;
-import org.apache.avro.util.Utf8;
-
-
-public final class DataOutputEncoder extends Encoder implements 
java.io.Serializable {
-       
-       private static final long serialVersionUID = 1L;
-       
-       private transient DataOutput out;
-       
-       
-       public void setOut(DataOutput out) {
-               this.out = out;
-       }
-
-
-       @Override
-       public void flush() throws IOException {}
-
-       // 
--------------------------------------------------------------------------------------------
-       // primitives
-       // 
--------------------------------------------------------------------------------------------
-       
-       @Override
-       public void writeNull() {}
-       
-
-       @Override
-       public void writeBoolean(boolean b) throws IOException {
-               out.writeBoolean(b);
-       }
-
-       @Override
-       public void writeInt(int n) throws IOException {
-               out.writeInt(n);
-       }
-
-       @Override
-       public void writeLong(long n) throws IOException {
-               out.writeLong(n);
-       }
-
-       @Override
-       public void writeFloat(float f) throws IOException {
-               out.writeFloat(f);
-       }
-
-       @Override
-       public void writeDouble(double d) throws IOException {
-               out.writeDouble(d);
-       }
-       
-       @Override
-       public void writeEnum(int e) throws IOException {
-               out.writeInt(e);
-       }
-       
-       
-       // 
--------------------------------------------------------------------------------------------
-       // bytes
-       // 
--------------------------------------------------------------------------------------------
-
-       @Override
-       public void writeFixed(byte[] bytes, int start, int len) throws 
IOException {
-               out.write(bytes, start, len);
-       }
-       
-       @Override
-       public void writeBytes(byte[] bytes, int start, int len) throws 
IOException {
-               out.writeInt(len);
-               if (len > 0) {
-                       out.write(bytes, start, len);
-               }
-       }
-       
-       @Override
-       public void writeBytes(ByteBuffer bytes) throws IOException {
-               int num = bytes.remaining();
-               out.writeInt(num);
-               
-               if (num > 0) {
-                       writeFixed(bytes);
-               }
-       }
-       
-       // 
--------------------------------------------------------------------------------------------
-       // strings
-       // 
--------------------------------------------------------------------------------------------
-
-       @Override
-       public void writeString(String str) throws IOException {
-               byte[] bytes = Utf8.getBytesFor(str);
-               writeBytes(bytes, 0, bytes.length);
-       }
-       
-       @Override
-       public void writeString(Utf8 utf8) throws IOException {
-               writeBytes(utf8.getBytes(), 0, utf8.getByteLength());
-               
-       }
-
-       // 
--------------------------------------------------------------------------------------------
-       // collection types
-       // 
--------------------------------------------------------------------------------------------
-
-       @Override
-       public void writeArrayStart() {}
-
-       @Override
-       public void setItemCount(long itemCount) throws IOException {
-               if (itemCount > 0) {
-                       writeVarLongCount(out, itemCount);
-               }
-       }
-
-       @Override
-       public void startItem() {}
-
-       @Override
-       public void writeArrayEnd() throws IOException {
-               // write a single byte 0, shortcut for a var-length long of 0
-               out.write(0);
-       }
-
-       @Override
-       public void writeMapStart() {}
-
-       @Override
-       public void writeMapEnd() throws IOException {
-               // write a single byte 0, shortcut for a var-length long of 0
-               out.write(0);
-       }
-
-       // 
--------------------------------------------------------------------------------------------
-       // union
-       // 
--------------------------------------------------------------------------------------------
-       
-       @Override
-       public void writeIndex(int unionIndex) throws IOException {
-               out.writeInt(unionIndex);
-       }
-       
-       // 
--------------------------------------------------------------------------------------------
-       // utils
-       // 
--------------------------------------------------------------------------------------------
-               
-       
-       public static final void writeVarLongCount(DataOutput out, long val) 
throws IOException {
-               if (val < 0) {
-                       throw new IOException("Illegal count (must be 
non-negative): " + val);
-               }
-               
-               while ((val & ~0x7FL) != 0) {
-                       out.write(((int) val) | 0x80);
-                       val >>>= 7;
-               }
-               out.write((int) val);
-       }
-       
-       private void readObject(java.io.ObjectInputStream s) throws 
java.io.IOException, ClassNotFoundException {
-               // Read in size, and any hidden stuff
-               s.defaultReadObject();
-
-               this.out = null;
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/DataOutputViewStream.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/DataOutputViewStream.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/DataOutputViewStream.java
deleted file mode 100644
index 66f2af6..0000000
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/DataOutputViewStream.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.typeutils.runtime;
-
-import org.apache.flink.core.memory.DataOutputView;
-import java.io.IOException;
-import java.io.OutputStream;
-
-public class DataOutputViewStream extends OutputStream {
-       protected DataOutputView outputView;
-
-       public DataOutputViewStream(DataOutputView outputView){
-               this.outputView = outputView;
-       }
-
-       @Override
-       public void write(int b) throws IOException {
-               outputView.writeByte(b);
-       }
-
-       @Override
-       public void write(byte[] b, int off, int len) throws IOException {
-               outputView.write(b, off, len);
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/EitherSerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/EitherSerializer.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/EitherSerializer.java
deleted file mode 100644
index b4b95f3..0000000
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/EitherSerializer.java
+++ /dev/null
@@ -1,193 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.typeutils.runtime;
-
-import static org.apache.flink.api.java.typeutils.Either.Left;
-import static org.apache.flink.api.java.typeutils.Either.Right;
-
-import java.io.IOException;
-
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.typeutils.Either;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-
-/**
- * A {@link TypeSerializer} for the {@ link Either} type of the Java class.
- *
- * @param <L> the Left value type
- * @param <R> the Right value type
- */
-public class EitherSerializer<L, R> extends TypeSerializer<Either<L, R>> {
-
-       private static final long serialVersionUID = 1L;
-
-       private final TypeSerializer<L> leftSerializer;
-
-       private final TypeSerializer<R> rightSerializer;
-
-       public EitherSerializer(TypeSerializer<L> leftSerializer, 
TypeSerializer<R> rightSerializer) {
-               this.leftSerializer = leftSerializer;
-               this.rightSerializer = rightSerializer;
-       }
-
-       @Override
-       public boolean isImmutableType() {
-               return leftSerializer.isImmutableType() && 
rightSerializer.isImmutableType();
-       }
-
-       @Override
-       public TypeSerializer<Either<L, R>> duplicate() {
-               TypeSerializer<L> duplicateLeft = leftSerializer.duplicate();
-               TypeSerializer<R> duplicateRight = rightSerializer.duplicate();
-
-               if ((leftSerializer != duplicateLeft) || (rightSerializer != 
duplicateRight)) {
-                       // stateful
-                       return new EitherSerializer<L, R>(duplicateLeft, 
duplicateRight);
-               }
-               else {
-                       return this;
-               }
-       }
-
-
-       @Override
-       public Either<L, R> createInstance() {
-               // We arbitrarily always create a Right value instance.
-               return Right(rightSerializer.createInstance());
-       }
-
-       @Override
-       public Either<L, R> copy(Either<L, R> from) {
-               if (from.isLeft()) {
-                       L left = from.left();
-                       L copyLeft = leftSerializer.copy(left);
-                       return Left(copyLeft);
-               }
-               else {
-                       R right = from.right();
-                       R copyRight = rightSerializer.copy(right);
-                       return Right(copyRight);
-               }
-       }
-
-       @Override
-       public Either<L, R> copy(Either<L, R> from, Either<L, R> reuse) {
-               if (from.isRight()) {
-                       final R right = from.right();
-                       if (reuse.isRight()) {
-                               R copyRight = rightSerializer.copy(right, 
reuse.right());
-                               return Right(copyRight);
-                       }
-                       else {
-                               // if the reuse record isn't a right value, we 
cannot reuse
-                               R copyRight = rightSerializer.copy(right);
-                               return Right(copyRight);
-                       }
-               }
-               else {
-                       L left = from.left();
-                       // reuse record is never a left value because we always 
create a right instance
-                       L copyLeft = leftSerializer.copy(left);
-                       return Left(copyLeft);
-               }
-       }
-
-       @Override
-       public int getLength() {
-               return -1;
-       }
-
-       @Override
-       public void serialize(Either<L, R> record, DataOutputView target) 
throws IOException {
-               if (record.isLeft()) {
-                       target.writeBoolean(true);
-                       leftSerializer.serialize(record.left(), target);
-               }
-               else {
-                       target.writeBoolean(false);
-                       rightSerializer.serialize(record.right(), target);
-               }
-       }
-
-       @Override
-       public Either<L, R> deserialize(DataInputView source) throws 
IOException {
-               boolean isLeft = source.readBoolean();
-               if (isLeft) {
-                       return Left(leftSerializer.deserialize(source));
-               }
-               else {
-                       return Right(rightSerializer.deserialize(source));
-               }
-       }
-
-       @Override
-       public Either<L, R> deserialize(Either<L, R> reuse, DataInputView 
source) throws IOException {
-               boolean isLeft = source.readBoolean();
-               if (!isLeft) {
-                       if (reuse.isRight()) {
-                               return 
Right(rightSerializer.deserialize(reuse.right(), source));
-                       }
-                       else {
-                               // if the reuse record isn't a right value, we 
cannot reuse
-                               return 
Right(rightSerializer.deserialize(source));
-                       }
-               }
-               else {
-                       // reuse record is never a left value because we always 
create a right instance
-                       return Left(leftSerializer.deserialize(source));
-               }
-       }
-
-       @Override
-       public void copy(DataInputView source, DataOutputView target) throws 
IOException {
-               boolean isLeft = source.readBoolean();
-               target.writeBoolean(isLeft);
-               if (isLeft) {
-                       leftSerializer.copy(source, target);
-               }
-               else {
-                       rightSerializer.copy(source, target);
-               }
-       }
-
-       @SuppressWarnings("unchecked")
-       @Override
-       public boolean equals(Object obj) {
-               if (obj instanceof EitherSerializer) {
-                       EitherSerializer<L, R> other = (EitherSerializer<L, R>) 
obj;
-
-                       return other.canEqual(this) &&
-                               leftSerializer.equals(other.leftSerializer) &&
-                               rightSerializer.equals(other.rightSerializer);
-               } else {
-                       return false;
-               }
-       }
-
-       @Override
-       public boolean canEqual(Object obj) {
-               return obj instanceof EitherSerializer;
-       }
-
-       @Override
-       public int hashCode() {
-               return 17 * leftSerializer.hashCode() + 
rightSerializer.hashCode();
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/GenericTypeComparator.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/GenericTypeComparator.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/GenericTypeComparator.java
deleted file mode 100644
index 28fea6a..0000000
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/GenericTypeComparator.java
+++ /dev/null
@@ -1,177 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.typeutils.runtime;
-
-
-import java.io.IOException;
-
-import org.apache.flink.api.common.typeutils.TypeComparator;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.core.memory.MemorySegment;
-import org.apache.flink.types.NormalizableKey;
-import org.apache.flink.util.InstantiationUtil;
-
-/**
- * TypeComparator for all types that extend Comparable.
- */
-public class GenericTypeComparator<T extends Comparable<T>> extends 
TypeComparator<T> {
-
-       private static final long serialVersionUID = 1L;
-
-       private final boolean ascending;
-
-       private final Class<T> type;
-
-       private TypeSerializer<T> serializer;
-
-       private transient T reference;
-
-       private transient T tmpReference;
-
-       @SuppressWarnings("rawtypes")
-       private final TypeComparator[] comparators = new TypeComparator[] 
{this};
-
-       // 
------------------------------------------------------------------------
-
-       public GenericTypeComparator(boolean ascending, TypeSerializer<T> 
serializer, Class<T> type) {
-               this.ascending = ascending;
-               this.serializer = serializer;
-               this.type = type;
-       }
-
-       private GenericTypeComparator(GenericTypeComparator<T> toClone) {
-               this.ascending = toClone.ascending;
-               this.serializer = toClone.serializer.duplicate();
-               this.type = toClone.type;
-       }
-
-       @Override
-       public int hash(T record) {
-               return record.hashCode();
-       }
-
-       @Override
-       public void setReference(T toCompare) {
-               this.reference = this.serializer.copy(toCompare);
-       }
-
-       @Override
-       public boolean equalToReference(T candidate) {
-               return candidate.equals(this.reference);
-       }
-
-       @Override
-       public int compareToReference(TypeComparator<T> referencedComparator) {
-               T otherRef = ((GenericTypeComparator<T>) 
referencedComparator).reference;
-               int cmp = otherRef.compareTo(this.reference);
-
-               return this.ascending ? cmp : -cmp;
-       }
-
-       @Override
-       public int compare(T first, T second) {
-               int cmp = first.compareTo(second);
-               return this.ascending ? cmp : -cmp;
-       }
-
-       @Override
-       public int compareSerialized(final DataInputView firstSource, final 
DataInputView secondSource) throws IOException {
-
-               if (this.reference == null) {
-                       this.reference = this.serializer.createInstance();
-               }
-
-               if (this.tmpReference == null) {
-                       this.tmpReference = this.serializer.createInstance();
-               }
-
-               this.reference = this.serializer.deserialize(this.reference, 
firstSource);
-               this.tmpReference = 
this.serializer.deserialize(this.tmpReference, secondSource);
-
-               int cmp = this.reference.compareTo(this.tmpReference);
-               return this.ascending ? cmp : -cmp;
-       }
-
-       @Override
-       public boolean supportsNormalizedKey() {
-               return NormalizableKey.class.isAssignableFrom(this.type);
-       }
-
-       @Override
-       public int getNormalizeKeyLen() {
-               if (this.reference == null) {
-                       this.reference = 
InstantiationUtil.instantiate(this.type);
-               }
-
-               NormalizableKey<?> key = (NormalizableKey<?>) this.reference;
-               return key.getMaxNormalizedKeyLen();
-       }
-
-       @Override
-       public boolean isNormalizedKeyPrefixOnly(int keyBytes) {
-               return keyBytes < getNormalizeKeyLen();
-       }
-
-       @Override
-       public void putNormalizedKey(T record, MemorySegment target, int 
offset, int numBytes) {
-               NormalizableKey<?> key = (NormalizableKey<?>) record;
-               key.copyNormalizedKey(target, offset, numBytes);
-       }
-
-       @Override
-       public boolean invertNormalizedKey() {
-               return !ascending;
-       }
-
-       @Override
-       public TypeComparator<T> duplicate() {
-               return new GenericTypeComparator<T>(this);
-       }
-
-       @Override
-       public int extractKeys(Object record, Object[] target, int index) {
-               target[index] = record;
-               return 1;
-       }
-
-       @SuppressWarnings("rawtypes")
-       @Override
-       public TypeComparator[] getFlatComparators() {
-               return comparators;
-       }
-
-       // 
------------------------------------------------------------------------
-
-       @Override
-       public boolean supportsSerializationWithKeyNormalization() {
-               return false;
-       }
-
-       @Override
-       public void writeWithKeyNormalization(T record, DataOutputView target) 
throws IOException {
-               throw new UnsupportedOperationException();
-       }
-
-       @Override
-       public T readWithKeyDenormalization(T reuse, DataInputView source) 
throws IOException {
-               throw new UnsupportedOperationException();
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoUtils.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoUtils.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoUtils.java
deleted file mode 100644
index faf5646..0000000
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoUtils.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.typeutils.runtime;
-
-import com.esotericsoftware.kryo.Kryo;
-import com.esotericsoftware.kryo.KryoException;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.util.InstantiationUtil;
-
-import java.io.IOException;
-
-/**
- * Convenience methods for Kryo
- */
-public class KryoUtils {
-
-       /**
-        * Tries to copy the given record from using the provided Kryo 
instance. If this fails, then
-        * the record from is copied by serializing it into a byte buffer and 
deserializing it from
-        * there.
-        *
-        * @param from Element to copy
-        * @param kryo Kryo instance to use
-        * @param serializer TypeSerializer which is used in case of a Kryo 
failure
-        * @param <T> Type of the element to be copied
-        * @return Copied element
-        */
-       public static <T> T copy(T from, Kryo kryo, TypeSerializer<T> 
serializer) {
-               try {
-                       return kryo.copy(from);
-               } catch (KryoException ke) {
-                       // Kryo could not copy the object --> try to 
serialize/deserialize the object
-                       try {
-                               byte[] byteArray = 
InstantiationUtil.serializeToByteArray(serializer, from);
-
-                               return 
InstantiationUtil.deserializeFromByteArray(serializer, byteArray);
-                       } catch (IOException ioe) {
-                               throw new RuntimeException("Could not copy 
object by serializing/deserializing" +
-                                       " it.", ioe);
-                       }
-               }
-       }
-
-       /**
-        * Tries to copy the given record from using the provided Kryo 
instance. If this fails, then
-        * the record from is copied by serializing it into a byte buffer and 
deserializing it from
-        * there.
-        *
-        * @param from Element to copy
-        * @param reuse Reuse element for the deserialization
-        * @param kryo Kryo instance to use
-        * @param serializer TypeSerializer which is used in case of a Kryo 
failure
-        * @param <T> Type of the element to be copied
-        * @return Copied element
-        */
-       public static <T> T copy(T from, T reuse, Kryo kryo, TypeSerializer<T> 
serializer) {
-               try {
-                       return kryo.copy(from);
-               } catch (KryoException ke) {
-                       // Kryo could not copy the object --> try to 
serialize/deserialize the object
-                       try {
-                               byte[] byteArray = 
InstantiationUtil.serializeToByteArray(serializer, from);
-
-                               return 
InstantiationUtil.deserializeFromByteArray(serializer, reuse, byteArray);
-                       } catch (IOException ioe) {
-                               throw new RuntimeException("Could not copy 
object by serializing/deserializing" +
-                                       " it.", ioe);
-                       }
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/NoFetchingInput.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/NoFetchingInput.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/NoFetchingInput.java
deleted file mode 100644
index 0f4fe94..0000000
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/NoFetchingInput.java
+++ /dev/null
@@ -1,141 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.typeutils.runtime;
-
-import com.esotericsoftware.kryo.KryoException;
-import com.esotericsoftware.kryo.io.Input;
-
-import java.io.EOFException;
-import java.io.IOException;
-import java.io.InputStream;
-
-public class NoFetchingInput extends Input {
-       public NoFetchingInput(InputStream inputStream){
-               super(inputStream, 8);
-       }
-
-       @Override
-       public boolean eof(){
-               throw new UnsupportedOperationException("NoFetchingInput does 
not support EOF.");
-       }
-
-       @Override
-       public int read() throws KryoException {
-               require(1);
-               return buffer[position++] & 0xFF;
-       }
-
-       @Override
-       public boolean canReadInt() throws KryoException {
-               throw new UnsupportedOperationException("NoFetchingInput cannot 
prefetch data.");
-       }
-
-       @Override
-       public boolean canReadLong() throws KryoException {
-               throw new UnsupportedOperationException("NoFetchingInput cannot 
prefetch data.");
-       }
-
-       /**
-        * Require makes sure that at least required number of bytes are kept 
in the buffer. If not, then
-        * it will load exactly the difference between required and currently 
available number of bytes.
-        * Thus, it will only load the data which is required and never 
prefetch data.
-        *
-        * @param required the number of bytes being available in the buffer
-        * @return the number of bytes remaining, which is equal to required
-        * @throws KryoException
-        */
-       @Override
-       protected int require(int required) throws KryoException {
-               if(required > capacity) {
-                       throw new KryoException("Buffer too small: capacity: " 
+ capacity + ", " +
-                                       "required: " + required);
-               }
-
-               position = 0;
-               int bytesRead = 0;
-               int count;
-               while(true){
-                       count = fill(buffer, bytesRead, required - bytesRead);
-
-                       if(count == -1){
-                               throw new KryoException(new EOFException("No 
more bytes left."));
-                       }
-
-                       bytesRead += count;
-                       if(bytesRead == required){
-                               break;
-                       }
-               }
-               limit = required;
-               return required;
-       }
-
-       @Override
-       public int read(byte[] bytes, int offset, int count) throws 
KryoException {
-               if(bytes == null){
-                       throw new IllegalArgumentException("bytes cannot be 
null.");
-               }
-
-               try {
-                       return inputStream.read(bytes, offset, count);
-               }catch(IOException ex){
-                       throw new KryoException(ex);
-               }
-       }
-
-       @Override
-       public void skip(int count) throws KryoException {
-               try{
-                       inputStream.skip(count);
-               }catch(IOException ex){
-                       throw new KryoException(ex);
-               }
-       }
-
-       @Override
-       public void readBytes(byte[] bytes, int offset, int count) throws 
KryoException {
-               if(bytes == null){
-                       throw new IllegalArgumentException("bytes cannot be 
null.");
-               }
-
-               try{
-                       int bytesRead = 0;
-                       int c;
-
-                       while(true){
-                               c = inputStream.read(bytes, offset+bytesRead, 
count-bytesRead);
-
-                               if(c == -1){
-                                       throw new KryoException(new 
EOFException("No more bytes left."));
-                               }
-
-                               bytesRead += c;
-
-                               if(bytesRead == count){
-                                       break;
-                               }
-                       }
-               }catch(IOException ex){
-                       throw new KryoException(ex);
-               }
-       }
-
-
-}
-

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoComparator.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoComparator.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoComparator.java
deleted file mode 100644
index c0c7797..0000000
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoComparator.java
+++ /dev/null
@@ -1,354 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.typeutils.runtime;
-
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-import java.lang.reflect.Field;
-import java.util.List;
-
-import org.apache.flink.api.common.typeutils.CompositeTypeComparator;
-import org.apache.flink.api.common.typeutils.TypeComparator;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.core.memory.MemorySegment;
-import org.apache.flink.types.NullKeyFieldException;
-import org.apache.flink.util.InstantiationUtil;
-
-
-public final class PojoComparator<T> extends CompositeTypeComparator<T> 
implements java.io.Serializable {
-       
-       private static final long serialVersionUID = 1L;
-
-       // Reflection fields for the comp fields
-       private transient Field[] keyFields;
-
-       private final TypeComparator<Object>[] comparators;
-
-       private final int[] normalizedKeyLengths;
-
-       private final int numLeadingNormalizableKeys;
-
-       private final int normalizableKeyPrefixLen;
-
-       private final boolean invertNormKey;
-
-       private TypeSerializer<T> serializer;
-
-       private final Class<T> type;
-
-       @SuppressWarnings("unchecked")
-       public PojoComparator(Field[] keyFields, TypeComparator<?>[] 
comparators, TypeSerializer<T> serializer, Class<T> type) {
-               this.keyFields = keyFields;
-               this.comparators = (TypeComparator<Object>[]) comparators;
-
-               this.type = type;
-               this.serializer = serializer;
-
-               // set up auxiliary fields for normalized key support
-               this.normalizedKeyLengths = new int[keyFields.length];
-               int nKeys = 0;
-               int nKeyLen = 0;
-               boolean inverted = false;
-
-               for (int i = 0; i < this.comparators.length; i++) {
-                       TypeComparator<?> k = this.comparators[i];
-                       if(k == null) {
-                               throw new IllegalArgumentException("One of the 
passed comparators is null");
-                       }
-                       if(keyFields[i] == null) {
-                               throw new IllegalArgumentException("One of the 
passed reflection fields is null");
-                       }
-
-                       // as long as the leading keys support normalized keys, 
we can build up the composite key
-                       if (k.supportsNormalizedKey()) {
-                               if (i == 0) {
-                                       // the first comparator decides whether 
we need to invert the key direction
-                                       inverted = k.invertNormalizedKey();
-                               }
-                               else if (k.invertNormalizedKey() != inverted) {
-                                       // if a successor does not agree on the 
invertion direction, it cannot be part of the normalized key
-                                       break;
-                               }
-
-                               nKeys++;
-                               final int len = k.getNormalizeKeyLen();
-                               if (len < 0) {
-                                       throw new RuntimeException("Comparator 
" + k.getClass().getName() + " specifies an invalid length for the normalized 
key: " + len);
-                               }
-                               this.normalizedKeyLengths[i] = len;
-                               nKeyLen += this.normalizedKeyLengths[i];
-
-                               if (nKeyLen < 0) {
-                                       // overflow, which means we are out of 
budget for normalized key space anyways
-                                       nKeyLen = Integer.MAX_VALUE;
-                                       break;
-                               }
-                       } else {
-                               break;
-                       }
-               }
-               this.numLeadingNormalizableKeys = nKeys;
-               this.normalizableKeyPrefixLen = nKeyLen;
-               this.invertNormKey = inverted;
-       }
-
-       @SuppressWarnings("unchecked")
-       private PojoComparator(PojoComparator<T> toClone) {
-               this.keyFields = toClone.keyFields;
-               this.comparators = new 
TypeComparator[toClone.comparators.length];
-
-               for (int i = 0; i < toClone.comparators.length; i++) {
-                       this.comparators[i] = 
toClone.comparators[i].duplicate();
-               }
-
-               this.normalizedKeyLengths = toClone.normalizedKeyLengths;
-               this.numLeadingNormalizableKeys = 
toClone.numLeadingNormalizableKeys;
-               this.normalizableKeyPrefixLen = 
toClone.normalizableKeyPrefixLen;
-               this.invertNormKey = toClone.invertNormKey;
-
-               this.type = toClone.type;
-
-               try {
-                       this.serializer = (TypeSerializer<T>) 
InstantiationUtil.deserializeObject(
-                                       
InstantiationUtil.serializeObject(toClone.serializer), 
Thread.currentThread().getContextClassLoader());
-               } catch (IOException e) {
-                       throw new RuntimeException("Cannot copy serializer", e);
-               } catch (ClassNotFoundException e) {
-                       throw new RuntimeException("Cannot copy serializer", e);
-               }
-       }
-
-       private void writeObject(ObjectOutputStream out)
-                       throws IOException, ClassNotFoundException {
-               out.defaultWriteObject();
-               out.writeInt(keyFields.length);
-               for (Field field: keyFields) {
-                       out.writeObject(field.getDeclaringClass());
-                       out.writeUTF(field.getName());
-               }
-       }
-
-       private void readObject(ObjectInputStream in)
-                       throws IOException, ClassNotFoundException {
-               in.defaultReadObject();
-               int numKeyFields = in.readInt();
-               keyFields = new Field[numKeyFields];
-               for (int i = 0; i < numKeyFields; i++) {
-                       Class<?> clazz = (Class<?>) in.readObject();
-                       String fieldName = in.readUTF();
-                       // try superclasses as well
-                       while (clazz != null) {
-                               try {
-                                       Field field = 
clazz.getDeclaredField(fieldName);
-                                       field.setAccessible(true);
-                                       keyFields[i] = field;
-                                       break;
-                               } catch (NoSuchFieldException e) {
-                                       clazz = clazz.getSuperclass();
-                               }
-                       }
-                       if (keyFields[i] == null ) {
-                               throw new RuntimeException("Class resolved at 
TaskManager is not compatible with class read during Plan setup."
-                                               + " (" + fieldName + ")");
-                       }
-               }
-       }
-
-       public Field[] getKeyFields() {
-               return this.keyFields;
-       }
-
-       @SuppressWarnings({ "rawtypes", "unchecked" })
-       @Override
-       public void getFlatComparator(List<TypeComparator> flatComparators) {
-               for(int i = 0; i < comparators.length; i++) {
-                       if(comparators[i] instanceof CompositeTypeComparator) {
-                               
((CompositeTypeComparator)comparators[i]).getFlatComparator(flatComparators);
-                       } else {
-                               flatComparators.add(comparators[i]);
-                       }
-               }
-       }
-       
-       /**
-        * This method is handling the IllegalAccess exceptions of Field.get()
-        */
-       public final Object accessField(Field field, Object object) {
-               try {
-                       object = field.get(object);
-               } catch (NullPointerException npex) {
-                       throw new NullKeyFieldException("Unable to access field 
"+field+" on object "+object);
-               } catch (IllegalAccessException iaex) {
-                       throw new RuntimeException("This should not happen 
since we call setAccesssible(true) in PojoTypeInfo."
-                       + " fields: " + field + " obj: " + object);
-               }
-               return object;
-       }
-
-       @Override
-       public int hash(T value) {
-               int i = 0;
-               int code = 0;
-               for (; i < this.keyFields.length; i++) {
-                       code *= TupleComparatorBase.HASH_SALT[i & 0x1F];
-                       try {
-                               code += 
this.comparators[i].hash(accessField(keyFields[i], value));
-                       }catch(NullPointerException npe) {
-                               throw new RuntimeException("A 
NullPointerException occured while accessing a key field in a POJO. " +
-                                               "Most likely, the value 
grouped/joined on is null. Field name: "+keyFields[i].getName(), npe);
-                       }
-               }
-               return code;
-
-       }
-
-       @Override
-       public void setReference(T toCompare) {
-               int i = 0;
-               for (; i < this.keyFields.length; i++) {
-                       
this.comparators[i].setReference(accessField(keyFields[i], toCompare));
-               }
-       }
-
-       @Override
-       public boolean equalToReference(T candidate) {
-               int i = 0;
-               for (; i < this.keyFields.length; i++) {
-                       if 
(!this.comparators[i].equalToReference(accessField(keyFields[i], candidate))) {
-                               return false;
-                       }
-               }
-               return true;
-       }
-
-       @Override
-       public int compareToReference(TypeComparator<T> referencedComparator) {
-               PojoComparator<T> other = (PojoComparator<T>) 
referencedComparator;
-
-               int i = 0;
-               try {
-                       for (; i < this.keyFields.length; i++) {
-                               int cmp = 
this.comparators[i].compareToReference(other.comparators[i]);
-                               if (cmp != 0) {
-                                       return cmp;
-                               }
-                       }
-                       return 0;
-               }
-               catch (NullPointerException npex) {
-                       throw new 
NullKeyFieldException(this.keyFields[i].toString());
-               }
-       }
-
-       @Override
-       public int compare(T first, T second) {
-               int i = 0;
-               for (; i < keyFields.length; i++) {
-                       int cmp = 
comparators[i].compare(accessField(keyFields[i], first), 
accessField(keyFields[i], second));
-                       if (cmp != 0) {
-                               return cmp;
-                       }
-               }
-
-               return 0;
-       }
-
-       
-       @Override
-       public int compareSerialized(DataInputView firstSource, DataInputView 
secondSource) throws IOException {
-               T first = this.serializer.createInstance();
-               T second = this.serializer.createInstance();
-
-               first = this.serializer.deserialize(first, firstSource);
-               second = this.serializer.deserialize(second, secondSource);
-
-               return this.compare(first, second);
-       }
-
-       @Override
-       public boolean supportsNormalizedKey() {
-               return this.numLeadingNormalizableKeys > 0;
-       }
-
-       @Override
-       public int getNormalizeKeyLen() {
-               return this.normalizableKeyPrefixLen;
-       }
-
-       @Override
-       public boolean isNormalizedKeyPrefixOnly(int keyBytes) {
-               return this.numLeadingNormalizableKeys < this.keyFields.length 
||
-                               this.normalizableKeyPrefixLen == 
Integer.MAX_VALUE ||
-                               this.normalizableKeyPrefixLen > keyBytes;
-       }
-
-       @Override
-       public void putNormalizedKey(T value, MemorySegment target, int offset, 
int numBytes) {
-               int i = 0;
-               for (; i < this.numLeadingNormalizableKeys & numBytes > 0; i++)
-               {
-                       int len = this.normalizedKeyLengths[i];
-                       len = numBytes >= len ? len : numBytes;
-                       
this.comparators[i].putNormalizedKey(accessField(keyFields[i], value), target, 
offset, len);
-                       numBytes -= len;
-                       offset += len;
-               }
-       }
-
-       @Override
-       public boolean invertNormalizedKey() {
-               return this.invertNormKey;
-       }
-
-
-       @Override
-       public boolean supportsSerializationWithKeyNormalization() {
-               return false;
-       }
-
-       @Override
-       public void writeWithKeyNormalization(T record, DataOutputView target) 
throws IOException {
-               throw new UnsupportedOperationException();
-       }
-
-       @Override
-       public T readWithKeyDenormalization(T reuse, DataInputView source) 
throws IOException {
-               throw new UnsupportedOperationException();
-       }
-
-       @Override
-       public PojoComparator<T> duplicate() {
-               return new PojoComparator<T>(this);
-       }
-
-       @Override
-       public int extractKeys(Object record, Object[] target, int index) {
-               int localIndex = index;
-               for (int i = 0; i < comparators.length; i++) {
-                       localIndex += 
comparators[i].extractKeys(accessField(keyFields[i], record), target, 
localIndex);
-               }
-               return localIndex - index;
-       }
-
-       // 
--------------------------------------------------------------------------------------------
-}
-

Reply via email to