http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSubclassSerializerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSubclassSerializerTest.java
 
b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSubclassSerializerTest.java
new file mode 100644
index 0000000..8c61a19
--- /dev/null
+++ 
b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSubclassSerializerTest.java
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime;
+
+import com.google.common.base.Objects;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.SerializerTestBase;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.junit.Test;
+
+import java.util.Random;
+
+/**
+ * A test for the {@link PojoSerializer}.
+ */
+public class PojoSubclassSerializerTest extends 
SerializerTestBase<PojoSubclassSerializerTest.TestUserClassBase> {
+       private TypeInformation<TestUserClassBase> type = 
TypeExtractor.getForClass(TestUserClassBase.class);
+
+       @Override
+       protected TypeSerializer<TestUserClassBase> createSerializer() {
+               // only register one of the three child classes, the third 
child class is NO POJO
+               ExecutionConfig conf = new ExecutionConfig();
+               conf.registerPojoType(TestUserClass1.class);
+               TypeSerializer<TestUserClassBase> serializer = 
type.createSerializer(conf);
+               assert(serializer instanceof PojoSerializer);
+               return serializer;
+       }
+
+       @Override
+       protected int getLength() {
+               return -1;
+       }
+
+       @Override
+       protected Class<TestUserClassBase> getTypeClass() {
+               return TestUserClassBase.class;
+       }
+
+       @Override
+       protected TestUserClassBase[] getTestData() {
+               Random rnd = new Random(874597969123412341L);
+
+               return new TestUserClassBase[]{
+                               new TestUserClass1(rnd.nextInt(), "foo", 
rnd.nextLong()),
+                               new TestUserClass2(rnd.nextInt(), "bar", 
rnd.nextFloat()),
+                               new TestUserClass3(rnd.nextInt(), "bar", 
rnd.nextFloat())
+               };
+
+       }
+
+       @Override
+       @Test
+       public void testInstantiate() {
+               // don't do anything, since the PojoSerializer with subclass 
will return null
+       }
+
+       // User code class for testing the serializer
+       public static abstract class TestUserClassBase {
+               public int dumm1;
+               public String dumm2;
+
+
+               public TestUserClassBase() {
+               }
+
+               public TestUserClassBase(int dumm1, String dumm2) {
+                       this.dumm1 = dumm1;
+                       this.dumm2 = dumm2;
+               }
+
+               @Override
+               public int hashCode() {
+                       return Objects.hashCode(dumm1, dumm2);
+               }
+
+               @Override
+               public boolean equals(Object other) {
+                       if (!(other instanceof TestUserClassBase)) {
+                               return false;
+                       }
+                       TestUserClassBase otherTUC = (TestUserClassBase) other;
+                       if (dumm1 != otherTUC.dumm1) {
+                               return false;
+                       }
+                       if (!dumm2.equals(otherTUC.dumm2)) {
+                               return false;
+                       }
+                       return true;
+               }
+       }
+
+       public static class TestUserClass1 extends TestUserClassBase {
+               public long dumm3;
+
+               public TestUserClass1() {
+               }
+
+               public TestUserClass1(int dumm1, String dumm2, long dumm3) {
+                       super(dumm1, dumm2);
+                       this.dumm3 = dumm3;
+               }
+
+               @Override
+               public boolean equals(Object other) {
+                       if (!(other instanceof TestUserClass1)) {
+                               return false;
+                       }
+                       TestUserClass1 otherTUC = (TestUserClass1) other;
+                       if (dumm1 != otherTUC.dumm1) {
+                               return false;
+                       }
+                       if (!dumm2.equals(otherTUC.dumm2)) {
+                               return false;
+                       }
+                       if (dumm3 != otherTUC.dumm3) {
+                               return false;
+                       }
+                       return true;
+               }
+       }
+
+       public static class TestUserClass2 extends TestUserClassBase {
+               public float dumm4;
+
+               public TestUserClass2() {
+               }
+
+               public TestUserClass2(int dumm1, String dumm2, float dumm4) {
+                       super(dumm1, dumm2);
+                       this.dumm4 = dumm4;
+               }
+
+               @Override
+               public boolean equals(Object other) {
+                       if (!(other instanceof TestUserClass2)) {
+                               return false;
+                       }
+                       TestUserClass2 otherTUC = (TestUserClass2) other;
+                       if (dumm1 != otherTUC.dumm1) {
+                               return false;
+                       }
+                       if (!dumm2.equals(otherTUC.dumm2)) {
+                               return false;
+                       }
+                       if (dumm4 != otherTUC.dumm4) {
+                               return false;
+                       }
+                       return true;
+               }
+       }
+
+       public static class TestUserClass3 extends TestUserClassBase {
+               public float dumm4;
+
+               public TestUserClass3(int dumm1, String dumm2, float dumm4) {
+                       super(dumm1, dumm2);
+                       this.dumm4 = dumm4;
+               }
+
+               @Override
+               public boolean equals(Object other) {
+                       if (!(other instanceof TestUserClass3)) {
+                               return false;
+                       }
+                       TestUserClass3 otherTUC = (TestUserClass3) other;
+                       if (dumm1 != otherTUC.dumm1) {
+                               return false;
+                       }
+                       if (!dumm2.equals(otherTUC.dumm2)) {
+                               return false;
+                       }
+                       if (dumm4 != otherTUC.dumm4) {
+                               return false;
+                       }
+                       return true;
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/StringArrayWritable.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/StringArrayWritable.java
 
b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/StringArrayWritable.java
new file mode 100644
index 0000000..7c608f2
--- /dev/null
+++ 
b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/StringArrayWritable.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+
+public class StringArrayWritable implements Writable, 
Comparable<StringArrayWritable> {
+       
+       private String[] array = new String[0];
+       
+       public StringArrayWritable() {
+               super();
+       }
+       
+       public StringArrayWritable(String[] array) {
+               this.array = array;
+       }
+       
+       @Override
+       public void write(DataOutput out) throws IOException {
+               out.writeInt(this.array.length);
+               
+               for(String str : this.array) {
+                       byte[] b = str.getBytes();
+                       out.writeInt(b.length);
+                       out.write(b);
+               }
+       }
+       
+       @Override
+       public void readFields(DataInput in) throws IOException {
+               this.array = new String[in.readInt()];
+               
+               for(int i = 0; i < this.array.length; i++) {
+                       byte[] b = new byte[in.readInt()];
+                       in.readFully(b);
+                       this.array[i] = new String(b);
+               }
+       }
+       
+       @Override
+       public int compareTo(StringArrayWritable o) {
+               if(this.array.length != o.array.length) {
+                       return this.array.length - o.array.length;
+               }
+               
+               for(int i = 0; i < this.array.length; i++) {
+                       int comp = this.array[i].compareTo(o.array[i]);
+                       if(comp != 0) {
+                               return comp;
+                       }
+               }
+               return 0;
+       }
+       
+       @Override
+       public boolean equals(Object obj) {
+               if(!(obj instanceof StringArrayWritable)) {
+                       return false;
+               }
+               return this.compareTo((StringArrayWritable) obj) == 0;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/SubclassFromInterfaceSerializerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/SubclassFromInterfaceSerializerTest.java
 
b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/SubclassFromInterfaceSerializerTest.java
new file mode 100644
index 0000000..b797090
--- /dev/null
+++ 
b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/SubclassFromInterfaceSerializerTest.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime;
+
+import com.google.common.base.Objects;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.SerializerTestBase;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
+import org.junit.Test;
+
+import java.util.Random;
+
+/**
+ * Testing the serialization of classes which are subclasses of a class that 
implements an interface.
+ */
+public class SubclassFromInterfaceSerializerTest extends 
SerializerTestBase<SubclassFromInterfaceSerializerTest.TestUserInterface> {
+       private TypeInformation<TestUserInterface> type = 
TypeExtractor.getForClass(TestUserInterface.class);
+
+       @Override
+       protected TypeSerializer<TestUserInterface> createSerializer() {
+               // only register one of the two child classes
+               ExecutionConfig conf = new ExecutionConfig();
+               conf.registerPojoType(TestUserClass2.class);
+               TypeSerializer<TestUserInterface> serializer = 
type.createSerializer(conf);
+               assert(serializer instanceof KryoSerializer);
+               return serializer;
+       }
+
+       @Override
+       protected int getLength() {
+               return -1;
+       }
+
+       @Override
+       protected Class<TestUserInterface> getTypeClass() {
+               return TestUserInterface.class;
+       }
+
+       @Override
+       protected TestUserInterface[] getTestData() {
+               Random rnd = new Random(874597969123412341L);
+
+               return new TestUserInterface[]{
+                               new TestUserClass1(rnd.nextInt(), "foo", 
rnd.nextLong()),
+                               new TestUserClass2(rnd.nextInt(), "bar", 
rnd.nextFloat())
+               };
+
+       }
+
+       @Override
+       @Test
+       public void testInstantiate() {
+               // don't do anything, since the PojoSerializer with subclass 
will return null
+       }
+
+       public interface TestUserInterface {}
+
+       // User code class for testing the serializer
+       public static class TestUserClassBase implements TestUserInterface {
+               public int dumm1;
+               public String dumm2;
+
+
+               public TestUserClassBase() {
+               }
+
+               public TestUserClassBase(int dumm1, String dumm2) {
+                       this.dumm1 = dumm1;
+                       this.dumm2 = dumm2;
+               }
+
+               @Override
+               public int hashCode() {
+                       return Objects.hashCode(dumm1, dumm2);
+               }
+
+               @Override
+               public boolean equals(Object other) {
+                       if (!(other instanceof TestUserClassBase)) {
+                               return false;
+                       }
+                       TestUserClassBase otherTUC = (TestUserClassBase) other;
+                       if (dumm1 != otherTUC.dumm1) {
+                               return false;
+                       }
+                       if (!dumm2.equals(otherTUC.dumm2)) {
+                               return false;
+                       }
+                       return true;
+               }
+       }
+
+       public static class TestUserClass1 extends TestUserClassBase {
+               public long dumm3;
+
+               public TestUserClass1() {
+               }
+
+               public TestUserClass1(int dumm1, String dumm2, long dumm3) {
+                       super(dumm1, dumm2);
+                       this.dumm3 = dumm3;
+               }
+
+               @Override
+               public boolean equals(Object other) {
+                       if (!(other instanceof TestUserClass1)) {
+                               return false;
+                       }
+                       TestUserClass1 otherTUC = (TestUserClass1) other;
+                       if (dumm1 != otherTUC.dumm1) {
+                               return false;
+                       }
+                       if (!dumm2.equals(otherTUC.dumm2)) {
+                               return false;
+                       }
+                       if (dumm3 != otherTUC.dumm3) {
+                               return false;
+                       }
+                       return true;
+               }
+       }
+
+       public static class TestUserClass2 extends TestUserClassBase {
+               public float dumm4;
+
+               public TestUserClass2() {
+               }
+
+               public TestUserClass2(int dumm1, String dumm2, float dumm4) {
+                       super(dumm1, dumm2);
+                       this.dumm4 = dumm4;
+               }
+
+               @Override
+               public boolean equals(Object other) {
+                       if (!(other instanceof TestUserClass2)) {
+                               return false;
+                       }
+                       TestUserClass2 otherTUC = (TestUserClass2) other;
+                       if (dumm1 != otherTUC.dumm1) {
+                               return false;
+                       }
+                       if (!dumm2.equals(otherTUC.dumm2)) {
+                               return false;
+                       }
+                       if (dumm4 != otherTUC.dumm4) {
+                               return false;
+                       }
+                       return true;
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/TestDataOutputSerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/TestDataOutputSerializer.java
 
b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/TestDataOutputSerializer.java
new file mode 100644
index 0000000..87be6db
--- /dev/null
+++ 
b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/TestDataOutputSerializer.java
@@ -0,0 +1,308 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.UTFDataFormatException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.MemoryUtils;
+
+public final class TestDataOutputSerializer implements DataOutputView {
+       
+       private byte[] buffer;
+       
+       private int position;
+
+       private ByteBuffer wrapper;
+       
+       private final int maxSize;
+       
+
+       public TestDataOutputSerializer(int startSize) {
+               this(startSize, Integer.MAX_VALUE);
+       }
+       
+       public TestDataOutputSerializer(int startSize, int maxSize) {
+               if (startSize < 1 || startSize > maxSize) {
+                       throw new IllegalArgumentException();
+               }
+
+               this.buffer = new byte[startSize];
+               this.wrapper = ByteBuffer.wrap(buffer);
+               this.maxSize = maxSize;
+       }
+       
+       public ByteBuffer wrapAsByteBuffer() {
+               this.wrapper.position(0);
+               this.wrapper.limit(this.position);
+               return this.wrapper;
+       }
+
+       public byte[] copyByteBuffer() {
+               byte[] target = new byte[position];
+               System.arraycopy(buffer, 0, target, 0, position);
+
+               return target;
+       }
+
+       public void clear() {
+               this.position = 0;
+       }
+
+       public int length() {
+               return this.position;
+       }
+
+       @Override
+       public String toString() {
+               return String.format("[pos=%d cap=%d]", this.position, 
this.buffer.length);
+       }
+
+       // 
----------------------------------------------------------------------------------------
+       //                               Data Output
+       // 
----------------------------------------------------------------------------------------
+       
+       @Override
+       public void write(int b) throws IOException {
+               if (this.position >= this.buffer.length) {
+                       resize(1);
+               }
+               this.buffer[this.position++] = (byte) (b & 0xff);
+       }
+
+       @Override
+       public void write(byte[] b) throws IOException {
+               write(b, 0, b.length);
+       }
+
+       @Override
+       public void write(byte[] b, int off, int len) throws IOException {
+               if (len < 0 || off > b.length - len) {
+                       throw new ArrayIndexOutOfBoundsException();
+               }
+               if (this.position > this.buffer.length - len) {
+                       resize(len);
+               }
+               System.arraycopy(b, off, this.buffer, this.position, len);
+               this.position += len;
+       }
+
+       @Override
+       public void writeBoolean(boolean v) throws IOException {
+               write(v ? 1 : 0);
+       }
+
+       @Override
+       public void writeByte(int v) throws IOException {
+               write(v);
+       }
+
+       @Override
+       public void writeBytes(String s) throws IOException {
+               final int sLen = s.length();
+               if (this.position >= this.buffer.length - sLen) {
+                       resize(sLen);
+               }
+               
+               for (int i = 0; i < sLen; i++) {
+                       writeByte(s.charAt(i));
+               }
+               this.position += sLen;
+       }
+
+       @Override
+       public void writeChar(int v) throws IOException {
+               if (this.position >= this.buffer.length - 1) {
+                       resize(2);
+               }
+               this.buffer[this.position++] = (byte) (v >> 8);
+               this.buffer[this.position++] = (byte) v;
+       }
+
+       @Override
+       public void writeChars(String s) throws IOException {
+               final int sLen = s.length();
+               if (this.position >= this.buffer.length - 2*sLen) {
+                       resize(2*sLen);
+               } 
+               for (int i = 0; i < sLen; i++) {
+                       writeChar(s.charAt(i));
+               }
+       }
+
+       @Override
+       public void writeDouble(double v) throws IOException {
+               writeLong(Double.doubleToLongBits(v));
+       }
+
+       @Override
+       public void writeFloat(float v) throws IOException {
+               writeInt(Float.floatToIntBits(v));
+       }
+
+       @SuppressWarnings("restriction")
+       @Override
+       public void writeInt(int v) throws IOException {
+               if (this.position >= this.buffer.length - 3) {
+                       resize(4);
+               }
+               if (LITTLE_ENDIAN) {
+                       v = Integer.reverseBytes(v);
+               }                       
+               UNSAFE.putInt(this.buffer, BASE_OFFSET + this.position, v);
+               this.position += 4;
+       }
+
+       @SuppressWarnings("restriction")
+       @Override
+       public void writeLong(long v) throws IOException {
+               if (this.position >= this.buffer.length - 7) {
+                       resize(8);
+               }
+               if (LITTLE_ENDIAN) {
+                       v = Long.reverseBytes(v);
+               }
+               UNSAFE.putLong(this.buffer, BASE_OFFSET + this.position, v);
+               this.position += 8;
+       }
+
+       @Override
+       public void writeShort(int v) throws IOException {
+               if (this.position >= this.buffer.length - 1) {
+                       resize(2);
+               }
+               this.buffer[this.position++] = (byte) ((v >>> 8) & 0xff);
+               this.buffer[this.position++] = (byte) ((v >>> 0) & 0xff);
+       }
+
+       @Override
+       public void writeUTF(String str) throws IOException {
+               int strlen = str.length();
+               int utflen = 0;
+               int c;
+
+               /* use charAt instead of copying String to char array */
+               for (int i = 0; i < strlen; i++) {
+                       c = str.charAt(i);
+                       if ((c >= 0x0001) && (c <= 0x007F)) {
+                               utflen++;
+                       } else if (c > 0x07FF) {
+                               utflen += 3;
+                       } else {
+                               utflen += 2;
+                       }
+               }
+
+               if (utflen > 65535) {
+                       throw new UTFDataFormatException("Encoded string is too 
long: " + utflen);
+               }
+               else if (this.position > this.buffer.length - utflen - 2) {
+                       resize(utflen + 2);
+               }
+               
+               byte[] bytearr = this.buffer;
+               int count = this.position;
+
+               bytearr[count++] = (byte) ((utflen >>> 8) & 0xFF);
+               bytearr[count++] = (byte) ((utflen >>> 0) & 0xFF);
+
+               int i = 0;
+               for (i = 0; i < strlen; i++) {
+                       c = str.charAt(i);
+                       if (!((c >= 0x0001) && (c <= 0x007F))) {
+                               break;
+                       }
+                       bytearr[count++] = (byte) c;
+               }
+
+               for (; i < strlen; i++) {
+                       c = str.charAt(i);
+                       if ((c >= 0x0001) && (c <= 0x007F)) {
+                               bytearr[count++] = (byte) c;
+
+                       } else if (c > 0x07FF) {
+                               bytearr[count++] = (byte) (0xE0 | ((c >> 12) & 
0x0F));
+                               bytearr[count++] = (byte) (0x80 | ((c >> 6) & 
0x3F));
+                               bytearr[count++] = (byte) (0x80 | ((c >> 0) & 
0x3F));
+                       } else {
+                               bytearr[count++] = (byte) (0xC0 | ((c >> 6) & 
0x1F));
+                               bytearr[count++] = (byte) (0x80 | ((c >> 0) & 
0x3F));
+                       }
+               }
+
+               this.position = count;
+       }
+       
+       
+       private void resize(int minCapacityAdd) throws IOException {
+               try {
+                       int newLen = Math.max(this.buffer.length * 2, 
this.buffer.length + minCapacityAdd);
+                       
+                       if (newLen > maxSize) {
+                               
+                               if (this.buffer.length + minCapacityAdd > 
maxSize) {
+                                       throw new EOFException("Exceeded 
maximum capacity");
+                               }
+                               
+                               newLen = maxSize;
+                       }
+                       
+                       final byte[] nb = new byte[newLen];
+                       System.arraycopy(this.buffer, 0, nb, 0, this.position);
+                       this.buffer = nb;
+                       this.wrapper = ByteBuffer.wrap(this.buffer);
+               }
+               catch (NegativeArraySizeException nasex) {
+                       throw new IOException("Serialization failed because the 
record length would exceed 2GB (max addressable array size in Java).");
+               }
+       }
+       
+       @SuppressWarnings("restriction")
+       private static final sun.misc.Unsafe UNSAFE = MemoryUtils.UNSAFE;
+       
+       @SuppressWarnings("restriction")
+       private static final long BASE_OFFSET = 
UNSAFE.arrayBaseOffset(byte[].class);
+       
+       private static final boolean LITTLE_ENDIAN = 
(MemoryUtils.NATIVE_BYTE_ORDER == ByteOrder.LITTLE_ENDIAN);
+
+       @Override
+       public void skipBytesToWrite(int numBytes) throws IOException {
+               if(buffer.length - this.position < numBytes){
+                       throw new EOFException("Could not skip " + numBytes + " 
bytes.");
+               }
+
+               this.position += numBytes;
+       }
+
+       @Override
+       public void write(DataInputView source, int numBytes) throws 
IOException {
+               if(buffer.length - this.position < numBytes){
+                       throw new EOFException("Could not write " + numBytes + 
" bytes. Buffer overflow.");
+               }
+
+               source.read(this.buffer, this.position, numBytes);
+               this.position += numBytes;
+       }
+       
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorILD2Test.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorILD2Test.java
 
b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorILD2Test.java
new file mode 100644
index 0000000..cfc4914
--- /dev/null
+++ 
b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorILD2Test.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime;
+
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.DoubleSerializer;
+import org.apache.flink.api.common.typeutils.base.IntComparator;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.common.typeutils.base.LongComparator;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.java.tuple.Tuple3;
+
+import 
org.apache.flink.api.java.typeutils.runtime.tuple.base.TupleComparatorTestBase;
+
+public class TupleComparatorILD2Test extends 
TupleComparatorTestBase<Tuple3<Integer, Long, Double>> {
+
+       @SuppressWarnings("unchecked")
+       Tuple3<Integer, Long, Double>[] dataISD = new Tuple3[]{
+               new Tuple3<Integer, Long, Double>(4, 14L, 20.0),
+               new Tuple3<Integer, Long, Double>(4, 15L, 23.2),
+               new Tuple3<Integer, Long, Double>(5, 15L, 20.0),
+               new Tuple3<Integer, Long, Double>(5, 20L, 20.0),
+               new Tuple3<Integer, Long, Double>(6, 20L, 23.2),
+               new Tuple3<Integer, Long, Double>(6, 29L, 20.0),
+               new Tuple3<Integer, Long, Double>(7, 29L, 20.0),
+               new Tuple3<Integer, Long, Double>(7, 34L, 23.2)
+       };
+
+       @Override
+       protected TupleComparator<Tuple3<Integer, Long, Double>> 
createComparator(boolean ascending) {
+               return new TupleComparator<Tuple3<Integer, Long, Double>>(
+                               new int[]{0, 1},
+                               new TypeComparator[]{
+                                       new IntComparator(ascending),
+                                       new LongComparator(ascending)
+                               },
+                               new TypeSerializer[]{ IntSerializer.INSTANCE, 
LongSerializer.INSTANCE });
+       }
+
+       @SuppressWarnings("unchecked")
+       @Override
+       protected TupleSerializer<Tuple3<Integer, Long, Double>> 
createSerializer() {
+               return new TupleSerializer<Tuple3<Integer, Long, Double>>(
+                               (Class<Tuple3<Integer, Long, Double>>) 
(Class<?>) Tuple3.class,
+                               new TypeSerializer[]{
+                                       new IntSerializer(),
+                                       new LongSerializer(),
+                                       new DoubleSerializer()});
+       }
+
+       @Override
+       protected Tuple3<Integer, Long, Double>[] getSortedTestData() {
+               return dataISD;
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorILD3Test.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorILD3Test.java
 
b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorILD3Test.java
new file mode 100644
index 0000000..e5a0e6c
--- /dev/null
+++ 
b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorILD3Test.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime;
+
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.DoubleComparator;
+import org.apache.flink.api.common.typeutils.base.DoubleSerializer;
+import org.apache.flink.api.common.typeutils.base.IntComparator;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.common.typeutils.base.LongComparator;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.java.tuple.Tuple3;
+
+import 
org.apache.flink.api.java.typeutils.runtime.tuple.base.TupleComparatorTestBase;
+
+public class TupleComparatorILD3Test extends 
TupleComparatorTestBase<Tuple3<Integer, Long, Double>> {
+
+       @SuppressWarnings("unchecked")
+       Tuple3<Integer, Long, Double>[] dataISD = new Tuple3[]{
+               new Tuple3<Integer, Long, Double>(4, Long.valueOf(4), 20.0),
+               new Tuple3<Integer, Long, Double>(4, Long.valueOf(4), 23.2),
+               new Tuple3<Integer, Long, Double>(4, Long.valueOf(9), 20.0),
+               new Tuple3<Integer, Long, Double>(5, Long.valueOf(4), 20.0),
+               new Tuple3<Integer, Long, Double>(5, Long.valueOf(4), 23.2),
+               new Tuple3<Integer, Long, Double>(5, Long.valueOf(9), 20.0),
+               new Tuple3<Integer, Long, Double>(6, Long.valueOf(4), 20.0),
+               new Tuple3<Integer, Long, Double>(6, Long.valueOf(4), 23.2)
+       };
+
+       @Override
+       protected TupleComparator<Tuple3<Integer, Long, Double>> 
createComparator(boolean ascending) {
+               return new TupleComparator<Tuple3<Integer, Long, Double>>(
+                               new int[]{0, 1, 2},
+                               new TypeComparator[]{
+                                       new IntComparator(ascending),
+                                       new LongComparator(ascending),
+                                       new DoubleComparator(ascending)
+                               },
+               new TypeSerializer[]{ IntSerializer.INSTANCE, 
LongSerializer.INSTANCE, DoubleSerializer.INSTANCE });
+       }
+
+       @SuppressWarnings("unchecked")
+       @Override
+       protected TupleSerializer<Tuple3<Integer, Long, Double>> 
createSerializer() {
+               return new TupleSerializer<Tuple3<Integer, Long, Double>>(
+                               (Class<Tuple3<Integer, Long, Double>>) 
(Class<?>) Tuple3.class,
+                               new TypeSerializer[]{
+                                       new IntSerializer(),
+                                       new LongSerializer(),
+                                       new DoubleSerializer()});
+       }
+
+       @Override
+       protected Tuple3<Integer, Long, Double>[] getSortedTestData() {
+               return dataISD;
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorILDC3Test.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorILDC3Test.java
 
b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorILDC3Test.java
new file mode 100644
index 0000000..a1e6c40
--- /dev/null
+++ 
b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorILDC3Test.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime;
+
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.DoubleComparator;
+import org.apache.flink.api.common.typeutils.base.DoubleSerializer;
+import org.apache.flink.api.common.typeutils.base.IntComparator;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.common.typeutils.base.LongComparator;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.java.tuple.Tuple3;
+
+import 
org.apache.flink.api.java.typeutils.runtime.tuple.base.TupleComparatorTestBase;
+
+public class TupleComparatorILDC3Test extends 
TupleComparatorTestBase<Tuple3<Integer, Long, Double>> {
+
+       @SuppressWarnings("unchecked")
+       Tuple3<Integer, Long, Double>[] dataISD = new Tuple3[]{
+               new Tuple3<Integer, Long, Double>(4, Long.valueOf(4), 20.0),
+               new Tuple3<Integer, Long, Double>(5, Long.valueOf(1), 20.0),
+               new Tuple3<Integer, Long, Double>(5, Long.valueOf(2), 20.0),
+               new Tuple3<Integer, Long, Double>(5, Long.valueOf(10), 23.0),
+               new Tuple3<Integer, Long, Double>(5, Long.valueOf(19), 24.0),
+               new Tuple3<Integer, Long, Double>(5, Long.valueOf(20), 24.0),
+               new Tuple3<Integer, Long, Double>(5, Long.valueOf(24), 25.0),
+               new Tuple3<Integer, Long, Double>(5, Long.valueOf(25), 25.0)
+       };
+
+       @Override
+       protected TupleComparator<Tuple3<Integer, Long, Double>> 
createComparator(boolean ascending) {
+               return new TupleComparator<Tuple3<Integer, Long, Double>>(
+                               new int[]{2, 0, 1},
+                               new TypeComparator[]{
+                                       new DoubleComparator(ascending),
+                                       new IntComparator(ascending),
+                                       new LongComparator(ascending)
+                               },
+               new TypeSerializer[]{ IntSerializer.INSTANCE, 
LongSerializer.INSTANCE, DoubleSerializer.INSTANCE });
+       }
+
+       @SuppressWarnings("unchecked")
+       @Override
+       protected TupleSerializer<Tuple3<Integer, Long, Double>> 
createSerializer() {
+               return new TupleSerializer<Tuple3<Integer, Long, Double>>(
+                               (Class<Tuple3<Integer, Long, Double>>) 
(Class<?>) Tuple3.class,
+                               new TypeSerializer[]{
+                                       new IntSerializer(),
+                                       new LongSerializer(),
+                                       new DoubleSerializer()});
+       }
+
+       @Override
+       protected Tuple3<Integer, Long, Double>[] getSortedTestData() {
+               return dataISD;
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorILDX1Test.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorILDX1Test.java
 
b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorILDX1Test.java
new file mode 100644
index 0000000..b5c0c1f
--- /dev/null
+++ 
b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorILDX1Test.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime;
+
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.DoubleSerializer;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.common.typeutils.base.LongComparator;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.java.tuple.Tuple3;
+
+import 
org.apache.flink.api.java.typeutils.runtime.tuple.base.TupleComparatorTestBase;
+
+public class TupleComparatorILDX1Test extends 
TupleComparatorTestBase<Tuple3<Integer, Long, Double>> {
+
+       @SuppressWarnings("unchecked")
+       Tuple3<Integer, Long, Double>[] dataISD = new Tuple3[]{
+               new Tuple3<Integer, Long, Double>(4, Long.valueOf(4), 20.0),
+               new Tuple3<Integer, Long, Double>(4, Long.valueOf(5), 23.2),
+               new Tuple3<Integer, Long, Double>(4, Long.valueOf(9), 20.0),
+               new Tuple3<Integer, Long, Double>(4, Long.valueOf(10), 24.0),
+               new Tuple3<Integer, Long, Double>(4, Long.valueOf(19), 23.2),
+               new Tuple3<Integer, Long, Double>(4, Long.valueOf(20), 24.0),
+               new Tuple3<Integer, Long, Double>(4, Long.valueOf(24), 20.0),
+               new Tuple3<Integer, Long, Double>(4, Long.valueOf(25), 23.2)
+       };
+
+       @Override
+       protected TupleComparator<Tuple3<Integer, Long, Double>> 
createComparator(boolean ascending) {
+               return new TupleComparator<Tuple3<Integer, Long, Double>>(
+                               new int[]{1},
+                               new TypeComparator[]{
+                                       new LongComparator(ascending)
+                               },
+               new TypeSerializer[]{  IntSerializer.INSTANCE, 
LongSerializer.INSTANCE });
+       }
+
+       @SuppressWarnings("unchecked")
+       @Override
+       protected TupleSerializer<Tuple3<Integer, Long, Double>> 
createSerializer() {
+               return new TupleSerializer<Tuple3<Integer, Long, Double>>(
+                               (Class<Tuple3<Integer, Long, Double>>) 
(Class<?>) Tuple3.class,
+                               new TypeSerializer[]{
+                                       new IntSerializer(),
+                                       new LongSerializer(),
+                                       new DoubleSerializer()});
+       }
+
+       @Override
+       protected Tuple3<Integer, Long, Double>[] getSortedTestData() {
+               return dataISD;
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorILDXC2Test.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorILDXC2Test.java
 
b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorILDXC2Test.java
new file mode 100644
index 0000000..793a2f4
--- /dev/null
+++ 
b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorILDXC2Test.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime;
+
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.DoubleComparator;
+import org.apache.flink.api.common.typeutils.base.DoubleSerializer;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.common.typeutils.base.LongComparator;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.java.tuple.Tuple3;
+
+import 
org.apache.flink.api.java.typeutils.runtime.tuple.base.TupleComparatorTestBase;
+
+public class TupleComparatorILDXC2Test extends 
TupleComparatorTestBase<Tuple3<Integer, Long, Double>> {
+
+       @SuppressWarnings("unchecked")
+       Tuple3<Integer, Long, Double>[] dataISD = new Tuple3[]{
+               new Tuple3<Integer, Long, Double>(4, 4L, 20.0),
+               new Tuple3<Integer, Long, Double>(4, 5L, 20.0),
+               new Tuple3<Integer, Long, Double>(4, 3L, 23.0),
+               new Tuple3<Integer, Long, Double>(4, 19L, 23.0),
+               new Tuple3<Integer, Long, Double>(4, 17L, 24.0),
+               new Tuple3<Integer, Long, Double>(4, 18L, 24.0),
+               new Tuple3<Integer, Long, Double>(4, 24L, 25.0),
+               new Tuple3<Integer, Long, Double>(4, 25L, 25.0)
+       };
+
+       @Override
+       protected TupleComparator<Tuple3<Integer, Long, Double>> 
createComparator(boolean ascending) {
+               return new TupleComparator<Tuple3<Integer, Long, Double>>(
+                               new int[]{2, 1},
+                               new TypeComparator[]{
+                                       new DoubleComparator(ascending),
+                                       new LongComparator(ascending)
+                               },
+               new TypeSerializer[]{ IntSerializer.INSTANCE, 
DoubleSerializer.INSTANCE, LongSerializer.INSTANCE });
+       }
+
+       @SuppressWarnings("unchecked")
+       @Override
+       protected TupleSerializer<Tuple3<Integer, Long, Double>> 
createSerializer() {
+               return new TupleSerializer<Tuple3<Integer, Long, Double>>(
+                               (Class<Tuple3<Integer, Long, Double>>) 
(Class<?>) Tuple3.class,
+                               new TypeSerializer[]{
+                                       new IntSerializer(),
+                                       new LongSerializer(),
+                                       new DoubleSerializer()});
+       }
+
+       @Override
+       protected Tuple3<Integer, Long, Double>[] getSortedTestData() {
+               return dataISD;
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorISD1Test.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorISD1Test.java
 
b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorISD1Test.java
new file mode 100644
index 0000000..8cdee9b
--- /dev/null
+++ 
b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorISD1Test.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime;
+
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.DoubleSerializer;
+import org.apache.flink.api.common.typeutils.base.IntComparator;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.api.java.tuple.Tuple3;
+
+import 
org.apache.flink.api.java.typeutils.runtime.tuple.base.TupleComparatorTestBase;
+
+public class TupleComparatorISD1Test extends 
TupleComparatorTestBase<Tuple3<Integer, String, Double>> {
+
+       @SuppressWarnings("unchecked")
+       Tuple3<Integer, String, Double>[] dataISD = new Tuple3[]{
+               new Tuple3<Integer, String, Double>(4, "hello", 20.0),
+               new Tuple3<Integer, String, Double>(5, "hello", 23.2),
+               new Tuple3<Integer, String, Double>(6, "world", 20.0),
+               new Tuple3<Integer, String, Double>(7, "hello", 20.0),
+               new Tuple3<Integer, String, Double>(8, "hello", 23.2),
+               new Tuple3<Integer, String, Double>(9, "world", 20.0),
+               new Tuple3<Integer, String, Double>(10, "hello", 20.0),
+               new Tuple3<Integer, String, Double>(11, "hello", 23.2)
+       };
+
+       @Override
+       protected TupleComparator<Tuple3<Integer, String, Double>> 
createComparator(boolean ascending) {
+               return new TupleComparator<Tuple3<Integer, String, Double>>(
+                               new int[]{0},
+                               new TypeComparator[]{ new 
IntComparator(ascending) },
+                               new TypeSerializer[]{ IntSerializer.INSTANCE });
+       }
+
+       @SuppressWarnings("unchecked")
+       @Override
+       protected TupleSerializer<Tuple3<Integer, String, Double>> 
createSerializer() {
+               return new TupleSerializer<Tuple3<Integer, String, Double>>(
+                               (Class<Tuple3<Integer, String, Double>>) 
(Class<?>) Tuple3.class,
+                               new TypeSerializer[]{
+                                       new IntSerializer(),
+                                       new StringSerializer(),
+                                       new DoubleSerializer()});
+       }
+
+       @Override
+       protected Tuple3<Integer, String, Double>[] getSortedTestData() {
+               return dataISD;
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorISD2Test.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorISD2Test.java
 
b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorISD2Test.java
new file mode 100644
index 0000000..06c292f
--- /dev/null
+++ 
b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorISD2Test.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime;
+
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.DoubleSerializer;
+import org.apache.flink.api.common.typeutils.base.IntComparator;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.common.typeutils.base.StringComparator;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.api.java.tuple.Tuple3;
+
+import 
org.apache.flink.api.java.typeutils.runtime.tuple.base.TupleComparatorTestBase;
+
+public class TupleComparatorISD2Test extends 
TupleComparatorTestBase<Tuple3<Integer, String, Double>> {
+
+       @SuppressWarnings("unchecked")
+       Tuple3<Integer, String, Double>[] dataISD = new Tuple3[]{
+               new Tuple3<Integer, String, Double>(4, "hello", 20.0),
+               new Tuple3<Integer, String, Double>(4, "world", 23.2),
+               new Tuple3<Integer, String, Double>(5, "hello", 20.0),
+               new Tuple3<Integer, String, Double>(5, "world", 20.0),
+               new Tuple3<Integer, String, Double>(6, "hello", 23.2),
+               new Tuple3<Integer, String, Double>(6, "world", 20.0),
+               new Tuple3<Integer, String, Double>(7, "hello", 20.0),
+               new Tuple3<Integer, String, Double>(7, "world", 23.2)
+       };
+
+       @Override
+       protected TupleComparator<Tuple3<Integer, String, Double>> 
createComparator(boolean ascending) {
+               return new TupleComparator<Tuple3<Integer, String, Double>>(
+                               new int[]{0, 1},
+                               new TypeComparator[]{
+                                       new IntComparator(ascending),
+                                       new StringComparator(ascending)
+                               },
+               new TypeSerializer[]{ IntSerializer.INSTANCE, 
StringSerializer.INSTANCE, DoubleSerializer.INSTANCE });
+       }
+
+       @SuppressWarnings("unchecked")
+       @Override
+       protected TupleSerializer<Tuple3<Integer, String, Double>> 
createSerializer() {
+               return new TupleSerializer<Tuple3<Integer, String, Double>>(
+                               (Class<Tuple3<Integer, String, Double>>) 
(Class<?>) Tuple3.class,
+                               new TypeSerializer[]{
+                                       new IntSerializer(),
+                                       new StringSerializer(),
+                                       new DoubleSerializer()});
+       }
+
+       @Override
+       protected Tuple3<Integer, String, Double>[] getSortedTestData() {
+               return dataISD;
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorISD3Test.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorISD3Test.java
 
b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorISD3Test.java
new file mode 100644
index 0000000..d823a29
--- /dev/null
+++ 
b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorISD3Test.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime;
+
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.DoubleComparator;
+import org.apache.flink.api.common.typeutils.base.DoubleSerializer;
+import org.apache.flink.api.common.typeutils.base.IntComparator;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.common.typeutils.base.StringComparator;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.api.java.tuple.Tuple3;
+
+import 
org.apache.flink.api.java.typeutils.runtime.tuple.base.TupleComparatorTestBase;
+
+public class TupleComparatorISD3Test extends 
TupleComparatorTestBase<Tuple3<Integer, String, Double>> {
+
+       @SuppressWarnings("unchecked")
+       Tuple3<Integer, String, Double>[] dataISD = new Tuple3[]{
+               new Tuple3<Integer, String, Double>(4, "hello", 20.0),
+               new Tuple3<Integer, String, Double>(4, "hello", 23.2),
+               new Tuple3<Integer, String, Double>(4, "world", 20.0),
+               new Tuple3<Integer, String, Double>(5, "hello", 20.0),
+               new Tuple3<Integer, String, Double>(5, "hello", 23.2),
+               new Tuple3<Integer, String, Double>(5, "world", 20.0),
+               new Tuple3<Integer, String, Double>(6, "hello", 20.0),
+               new Tuple3<Integer, String, Double>(6, "hello", 23.2)
+       };
+
+       @Override
+       protected TupleComparator<Tuple3<Integer, String, Double>> 
createComparator(boolean ascending) {
+               return new TupleComparator<Tuple3<Integer, String, Double>>(
+                               new int[]{0, 1, 2},
+                               new TypeComparator[]{
+                                       new IntComparator(ascending),
+                                       new StringComparator(ascending),
+                                       new DoubleComparator(ascending)
+                               },
+               new TypeSerializer[]{ IntSerializer.INSTANCE, 
StringSerializer.INSTANCE, DoubleSerializer.INSTANCE });
+       }
+
+       @SuppressWarnings("unchecked")
+       @Override
+       protected TupleSerializer<Tuple3<Integer, String, Double>> 
createSerializer() {
+               return new TupleSerializer<Tuple3<Integer, String, Double>>(
+                               (Class<Tuple3<Integer, String, Double>>) 
(Class<?>) Tuple3.class,
+                               new TypeSerializer[]{
+                                       new IntSerializer(),
+                                       new StringSerializer(),
+                                       new DoubleSerializer()});
+       }
+
+       @Override
+       protected Tuple3<Integer, String, Double>[] getSortedTestData() {
+               return dataISD;
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorTTT1Test.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorTTT1Test.java
 
b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorTTT1Test.java
new file mode 100644
index 0000000..cf73be2
--- /dev/null
+++ 
b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorTTT1Test.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.java.typeutils.runtime;
+
+import static org.junit.Assert.assertEquals;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.DoubleComparator;
+import org.apache.flink.api.common.typeutils.base.DoubleSerializer;
+import org.apache.flink.api.common.typeutils.base.IntComparator;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.common.typeutils.base.LongComparator;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.common.typeutils.base.StringComparator;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import 
org.apache.flink.api.java.typeutils.runtime.tuple.base.TupleComparatorTestBase;
+
+public class TupleComparatorTTT1Test extends 
TupleComparatorTestBase<Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, 
Tuple2<Integer, Long>>> {
+
+       @SuppressWarnings("unchecked")
+       Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer, 
Long>>[] dataISD = new Tuple3[]{
+               new Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, 
Tuple2<Integer, Long>>(new Tuple2<String, Double>("hello", 1.0), new 
Tuple2<Long, Long>(1L, 1L), new Tuple2<Integer, Long>(4, -10L)),
+               new Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, 
Tuple2<Integer, Long>>(new Tuple2<String, Double>("hello", 2.0), new 
Tuple2<Long, Long>(1L, 2L), new Tuple2<Integer, Long>(4, -5L)),
+               new Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, 
Tuple2<Integer, Long>>(new Tuple2<String, Double>("hello", 3.0), new 
Tuple2<Long, Long>(1L, 3L), new Tuple2<Integer, Long>(4, 0L)),
+               new Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, 
Tuple2<Integer, Long>>(new Tuple2<String, Double>("hello", 3.5), new 
Tuple2<Long, Long>(1L, 4L), new Tuple2<Integer, Long>(4, 5L)),
+               new Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, 
Tuple2<Integer, Long>>(new Tuple2<String, Double>("hello", 4325.12), new 
Tuple2<Long, Long>(1L, 5L), new Tuple2<Integer, Long>(4, 15L)),
+               new Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, 
Tuple2<Integer, Long>>(new Tuple2<String, Double>("world", 1.0), new 
Tuple2<Long, Long>(2L, 4L), new Tuple2<Integer, Long>(45, -5L)),
+               new Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, 
Tuple2<Integer, Long>>(new Tuple2<String, Double>("world", 2.0), new 
Tuple2<Long, Long>(2L, 6L), new Tuple2<Integer, Long>(45, 5L)),
+               new Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, 
Tuple2<Integer, Long>>(new Tuple2<String, Double>("world", 3.0), new 
Tuple2<Long, Long>(2L, 8L), new Tuple2<Integer, Long>(323, 2L)),
+               new Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, 
Tuple2<Integer, Long>>(new Tuple2<String, Double>("world", 3.5), new 
Tuple2<Long, Long>(2L, 9L), new Tuple2<Integer, Long>(323, 5L)),
+               new Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, 
Tuple2<Integer, Long>>(new Tuple2<String, Double>("world", 4325.12), new 
Tuple2<Long, Long>(2L, 123L), new Tuple2<Integer, Long>(555, 1L))
+               
+       };
+       
+       @SuppressWarnings("unchecked")
+       @Override
+       protected TupleComparator<Tuple3<Tuple2<String, Double>, Tuple2<Long, 
Long>, Tuple2<Integer, Long>>> createComparator(
+                       boolean ascending) {
+               return new TupleComparator<Tuple3<Tuple2<String, Double>, 
Tuple2<Long, Long>, Tuple2<Integer, Long>>>(
+                               new int[] { 0 },
+                               new TypeComparator[] {
+                                               new 
TupleComparator<Tuple2<String, Double>>(
+                                                               new int[] { 0, 
1 },
+                                                               new 
TypeComparator[] {
+                                                               new 
StringComparator(ascending),
+                                                               new 
DoubleComparator(ascending) },
+                                                               new 
TypeSerializer[] {
+                                                                               
StringSerializer.INSTANCE,
+                                                                               
DoubleSerializer.INSTANCE }) },
+                               new TypeSerializer[] {
+                                               new 
TupleSerializer<Tuple2<String, Double>>(
+                                                               
(Class<Tuple2<String, Double>>) (Class<?>) Tuple2.class,
+                                                               new 
TypeSerializer[] {
+                                                                               
StringSerializer.INSTANCE,
+                                                                               
DoubleSerializer.INSTANCE }),
+                                               new 
TupleSerializer<Tuple2<Long, Long>>(
+                                                               
(Class<Tuple2<Long, Long>>) (Class<?>) Tuple2.class,
+                                                               new 
TypeSerializer[] {
+                                                                               
LongSerializer.INSTANCE,
+                                                                               
LongSerializer.INSTANCE }),
+                                               new 
TupleSerializer<Tuple2<Integer, Long>>(
+                                                               
(Class<Tuple2<Integer, Long>>) (Class<?>) Tuple2.class,
+                                                               new 
TypeSerializer[] {
+                                                                               
IntSerializer.INSTANCE,
+                                                                               
LongSerializer.INSTANCE }) });
+       }
+
+       @SuppressWarnings("unchecked")
+       @Override
+       protected TupleSerializer<Tuple3<Tuple2<String, Double>, Tuple2<Long, 
Long>, Tuple2<Integer, Long>>> createSerializer() {
+               return new  TupleSerializer<Tuple3<Tuple2<String, Double>, 
Tuple2<Long, Long>, Tuple2<Integer, Long>>>(
+                               (Class<Tuple3<Tuple2<String, Double>, 
Tuple2<Long, Long>, Tuple2<Integer, Long>>>) (Class<?>) Tuple3.class,
+                               new TypeSerializer[]{
+                                       new TupleSerializer<Tuple2<String, 
Double>> (
+                                                       (Class<Tuple2<String, 
Double>>) (Class<?>) Tuple2.class,
+                                                       new TypeSerializer[]{
+                                                                       
StringSerializer.INSTANCE,
+                                                                       
DoubleSerializer.INSTANCE
+                                       }),
+                                       new TupleSerializer<Tuple2<Long, Long>> 
(
+                                                       (Class<Tuple2<Long, 
Long>>) (Class<?>) Tuple2.class,
+                                                       new TypeSerializer[]{
+                                                                       
LongSerializer.INSTANCE,
+                                                                       
LongSerializer.INSTANCE
+                                       }),
+                                       new TupleSerializer<Tuple2<Integer, 
Long>> (
+                                                       (Class<Tuple2<Integer, 
Long>>) (Class<?>) Tuple2.class,
+                                                       new TypeSerializer[]{
+                                                                       
IntSerializer.INSTANCE,
+                                                                       
LongSerializer.INSTANCE
+                                       })
+                               });
+       }
+
+       @Override
+       protected Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, 
Tuple2<Integer, Long>>[] getSortedTestData() {
+               return this.dataISD;
+       }
+       
+       @Override
+       protected void deepEquals(
+                       String message,
+                       Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, 
Tuple2<Integer, Long>> should,
+                       Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, 
Tuple2<Integer, Long>> is) {
+               
+               for (int x = 0; x < should.getArity(); x++) {
+                       // Check whether field is of type Tuple2 because 
assertEquals must be called on the non Tuple2 fields.
+                       if(should.getField(x) instanceof Tuple2) {
+                               this.deepEquals(message, (Tuple2<?,?>) 
should.getField(x), (Tuple2<?,?>)is.getField(x));
+                       }
+                       else {
+                               assertEquals(message, should.getField(x), 
is.getField(x));
+                       }
+               }// For
+       }
+       
+       protected void deepEquals(String message, Tuple2<?,?> should, 
Tuple2<?,?> is) {
+               for (int x = 0; x < should.getArity(); x++) {
+                       assertEquals(message, should.getField(x), 
is.getField(x));
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorTTT2Test.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorTTT2Test.java
 
b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorTTT2Test.java
new file mode 100644
index 0000000..4b07c61
--- /dev/null
+++ 
b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorTTT2Test.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.java.typeutils.runtime;
+
+import static org.junit.Assert.assertEquals;
+
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.DoubleComparator;
+import org.apache.flink.api.common.typeutils.base.DoubleSerializer;
+import org.apache.flink.api.common.typeutils.base.IntComparator;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.common.typeutils.base.LongComparator;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.common.typeutils.base.StringComparator;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import 
org.apache.flink.api.java.typeutils.runtime.tuple.base.TupleComparatorTestBase;
+
+public class TupleComparatorTTT2Test extends 
TupleComparatorTestBase<Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, 
Tuple2<Integer, Long>>> {
+
+       @SuppressWarnings("unchecked")
+       Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer, 
Long>>[] dataISD = new Tuple3[]{
+               new Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, 
Tuple2<Integer, Long>>(new Tuple2<String, Double>("hello", 1.0), new 
Tuple2<Long, Long>(1L, 1L), new Tuple2<Integer, Long>(4, -10L)),
+               new Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, 
Tuple2<Integer, Long>>(new Tuple2<String, Double>("hello", 2.0), new 
Tuple2<Long, Long>(1L, 2L), new Tuple2<Integer, Long>(4, -5L)),
+               new Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, 
Tuple2<Integer, Long>>(new Tuple2<String, Double>("hello", 3.0), new 
Tuple2<Long, Long>(1L, 3L), new Tuple2<Integer, Long>(4, 0L)),
+               new Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, 
Tuple2<Integer, Long>>(new Tuple2<String, Double>("hello", 3.5), new 
Tuple2<Long, Long>(1L, 4L), new Tuple2<Integer, Long>(4, 5L)),
+               new Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, 
Tuple2<Integer, Long>>(new Tuple2<String, Double>("hello", 4325.12), new 
Tuple2<Long, Long>(1L, 5L), new Tuple2<Integer, Long>(4, 15L)),
+               new Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, 
Tuple2<Integer, Long>>(new Tuple2<String, Double>("world", 1.0), new 
Tuple2<Long, Long>(2L, 4L), new Tuple2<Integer, Long>(45, -5L)),
+               new Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, 
Tuple2<Integer, Long>>(new Tuple2<String, Double>("world", 2.0), new 
Tuple2<Long, Long>(2L, 6L), new Tuple2<Integer, Long>(45, 5L)),
+               new Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, 
Tuple2<Integer, Long>>(new Tuple2<String, Double>("world", 3.0), new 
Tuple2<Long, Long>(2L, 8L), new Tuple2<Integer, Long>(323, 2L)),
+               new Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, 
Tuple2<Integer, Long>>(new Tuple2<String, Double>("world", 3.5), new 
Tuple2<Long, Long>(2L, 9L), new Tuple2<Integer, Long>(323, 5L)),
+               new Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, 
Tuple2<Integer, Long>>(new Tuple2<String, Double>("world", 4325.12), new 
Tuple2<Long, Long>(2L, 123L), new Tuple2<Integer, Long>(555, 1L))
+               
+       };
+       
+       @SuppressWarnings("unchecked")
+       @Override
+       protected TupleComparator<Tuple3<Tuple2<String, Double>, Tuple2<Long, 
Long>, Tuple2<Integer, Long>>> createComparator(
+                       boolean ascending) {
+               return new TupleComparator<Tuple3<Tuple2<String, Double>, 
Tuple2<Long, Long>, Tuple2<Integer, Long>>>(
+                               new int[] { 0, 2 },
+                               new TypeComparator[] {
+                                               new 
TupleComparator<Tuple2<String, Double>>(
+                                                               new int[] { 0, 
1 },
+                                                               new 
TypeComparator[] {
+                                                               new 
StringComparator(ascending),
+                                                               new 
DoubleComparator(ascending) },
+                                                               new 
TypeSerializer[] {
+                                                                               
StringSerializer.INSTANCE,
+                                                                               
DoubleSerializer.INSTANCE }),
+                                               new 
TupleComparator<Tuple2<Integer, Long>>(
+                                                               new int[] {     
0, 1 },
+                                                               new 
TypeComparator[] {
+                                                               new 
IntComparator(ascending),
+                                                               new 
LongComparator(ascending) },
+                                                               new 
TypeSerializer[] {
+                                                                               
IntSerializer.INSTANCE,
+                                                                               
LongSerializer.INSTANCE }) },
+                               new TypeSerializer[] {
+                                               new 
TupleSerializer<Tuple2<String, Double>>(
+                                                               
(Class<Tuple2<String, Double>>) (Class<?>) Tuple2.class,
+                                                               new 
TypeSerializer[] {
+                                                                               
StringSerializer.INSTANCE,
+                                                                               
DoubleSerializer.INSTANCE }),
+                                               new 
TupleSerializer<Tuple2<Long, Long>>(
+                                                               
(Class<Tuple2<Long, Long>>) (Class<?>) Tuple2.class,
+                                                               new 
TypeSerializer[] {
+                                                                               
LongSerializer.INSTANCE,
+                                                                               
LongSerializer.INSTANCE }),
+                                               new 
TupleSerializer<Tuple2<Integer, Long>>(
+                                                               
(Class<Tuple2<Integer, Long>>) (Class<?>) Tuple2.class,
+                                                               new 
TypeSerializer[] {
+                                                                               
IntSerializer.INSTANCE,
+                                                                               
LongSerializer.INSTANCE }) });
+       }
+
+       @SuppressWarnings("unchecked")
+       @Override
+       protected TupleSerializer<Tuple3<Tuple2<String, Double>, Tuple2<Long, 
Long>, Tuple2<Integer, Long>>> createSerializer() {
+               return new  TupleSerializer<Tuple3<Tuple2<String, Double>, 
Tuple2<Long, Long>, Tuple2<Integer, Long>>>(
+                               (Class<Tuple3<Tuple2<String, Double>, 
Tuple2<Long, Long>, Tuple2<Integer, Long>>>) (Class<?>) Tuple3.class,
+                               new TypeSerializer[]{
+                                       new TupleSerializer<Tuple2<String, 
Double>> (
+                                                       (Class<Tuple2<String, 
Double>>) (Class<?>) Tuple2.class,
+                                                       new TypeSerializer[]{
+                                                                       
StringSerializer.INSTANCE,
+                                                                       
DoubleSerializer.INSTANCE}),
+                                       new TupleSerializer<Tuple2<Long, Long>> 
(
+                                                       (Class<Tuple2<Long, 
Long>>) (Class<?>) Tuple2.class,
+                                                       new TypeSerializer[]{
+                                                                       
LongSerializer.INSTANCE,
+                                                                       
LongSerializer.INSTANCE}),
+                                       new TupleSerializer<Tuple2<Integer, 
Long>> (
+                                                       (Class<Tuple2<Integer, 
Long>>) (Class<?>) Tuple2.class,
+                                                       new TypeSerializer[]{
+                                                                       
IntSerializer.INSTANCE,
+                                                                       
LongSerializer.INSTANCE})
+                               });
+       }
+
+       @Override
+       protected Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, 
Tuple2<Integer, Long>>[] getSortedTestData() {
+               return this.dataISD;
+       }
+       
+       @Override
+       protected void deepEquals(
+                       String message,
+                       Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, 
Tuple2<Integer, Long>> should,
+                       Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, 
Tuple2<Integer, Long>> is) {
+               
+               for (int x = 0; x < should.getArity(); x++) {
+                       // Check whether field is of type Tuple2 because 
assertEquals must be called on the non Tuple2 fields.
+                       if(should.getField(x) instanceof Tuple2) {
+                               this.deepEquals(message, (Tuple2<?,?>) 
should.getField(x), (Tuple2<?,?>)is.getField(x));
+                       }
+                       else {
+                               assertEquals(message, should.getField(x), 
is.getField(x));
+                       }
+               }// For
+       }
+       
+       protected void deepEquals(String message, Tuple2<?,?> should, 
Tuple2<?,?> is) {
+               for (int x = 0; x < should.getArity(); x++) {
+                       assertEquals(message, should.getField(x), 
is.getField(x));
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorTTT3Test.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorTTT3Test.java
 
b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorTTT3Test.java
new file mode 100644
index 0000000..0dfc094
--- /dev/null
+++ 
b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorTTT3Test.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.java.typeutils.runtime;
+
+import static org.junit.Assert.assertEquals;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.DoubleComparator;
+import org.apache.flink.api.common.typeutils.base.DoubleSerializer;
+import org.apache.flink.api.common.typeutils.base.IntComparator;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.common.typeutils.base.LongComparator;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.common.typeutils.base.StringComparator;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import 
org.apache.flink.api.java.typeutils.runtime.tuple.base.TupleComparatorTestBase;
+
+public class TupleComparatorTTT3Test extends 
TupleComparatorTestBase<Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, 
Tuple2<Integer, Long>>>{
+       @SuppressWarnings("unchecked")
+       Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer, 
Long>>[] dataISD = new Tuple3[]{
+               new Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, 
Tuple2<Integer, Long>>(new Tuple2<String, Double>("hello", 1.0), new 
Tuple2<Long, Long>(1L, 1L), new Tuple2<Integer, Long>(4, -10L)),
+               new Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, 
Tuple2<Integer, Long>>(new Tuple2<String, Double>("hello", 2.0), new 
Tuple2<Long, Long>(1L, 2L), new Tuple2<Integer, Long>(4, -5L)),
+               new Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, 
Tuple2<Integer, Long>>(new Tuple2<String, Double>("hello", 3.0), new 
Tuple2<Long, Long>(1L, 3L), new Tuple2<Integer, Long>(4, 0L)),
+               new Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, 
Tuple2<Integer, Long>>(new Tuple2<String, Double>("hello", 3.5), new 
Tuple2<Long, Long>(1L, 4L), new Tuple2<Integer, Long>(4, 5L)),
+               new Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, 
Tuple2<Integer, Long>>(new Tuple2<String, Double>("hello", 4325.12), new 
Tuple2<Long, Long>(1L, 5L), new Tuple2<Integer, Long>(4, 15L)),
+               new Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, 
Tuple2<Integer, Long>>(new Tuple2<String, Double>("world", 1.0), new 
Tuple2<Long, Long>(2L, 4L), new Tuple2<Integer, Long>(45, -5L)),
+               new Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, 
Tuple2<Integer, Long>>(new Tuple2<String, Double>("world", 2.0), new 
Tuple2<Long, Long>(2L, 6L), new Tuple2<Integer, Long>(45, 5L)),
+               new Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, 
Tuple2<Integer, Long>>(new Tuple2<String, Double>("world", 3.0), new 
Tuple2<Long, Long>(2L, 8L), new Tuple2<Integer, Long>(323, 2L)),
+               new Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, 
Tuple2<Integer, Long>>(new Tuple2<String, Double>("world", 3.5), new 
Tuple2<Long, Long>(2L, 9L), new Tuple2<Integer, Long>(323, 5L)),
+               new Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, 
Tuple2<Integer, Long>>(new Tuple2<String, Double>("world", 4325.12), new 
Tuple2<Long, Long>(2L, 123L), new Tuple2<Integer, Long>(555, 1L))
+               
+       };
+       
+       @SuppressWarnings("unchecked")
+       @Override
+       protected TupleComparator<Tuple3<Tuple2<String, Double>, Tuple2<Long, 
Long>, Tuple2<Integer, Long>>> createComparator(
+                       boolean ascending) {
+               return new TupleComparator<Tuple3<Tuple2<String, Double>, 
Tuple2<Long, Long>, Tuple2<Integer, Long>>>(
+                               new int[] { 0, 1, 2 },
+                               new TypeComparator[] {
+                                               new 
TupleComparator<Tuple2<String, Double>>(
+                                                               new int[] { 0, 
1 },
+                                                               new 
TypeComparator[] {
+                                                               new 
StringComparator(ascending),
+                                                               new 
DoubleComparator(ascending) },
+                                                               new 
TypeSerializer[] {
+                                                                               
StringSerializer.INSTANCE,
+                                                                               
DoubleSerializer.INSTANCE }),
+                                               new 
TupleComparator<Tuple2<Long, Long>>(
+                                                               new int[] { 0, 
1 },
+                                                               new 
TypeComparator[] {
+                                                               new 
LongComparator(ascending),
+                                                               new 
LongComparator(ascending) },
+                                                               new 
TypeSerializer[] {
+                                                                               
LongSerializer.INSTANCE,
+                                                                               
LongSerializer.INSTANCE }),
+                                               new 
TupleComparator<Tuple2<Integer, Long>>(
+                                                               new int[] {     
0, 1 },
+                                                               new 
TypeComparator[] {
+                                                               new 
IntComparator(ascending),
+                                                               new 
LongComparator(ascending) },
+                                                               new 
TypeSerializer[] {
+                                                                               
IntSerializer.INSTANCE,
+                                                                               
LongSerializer.INSTANCE }) },
+                               new TypeSerializer[] {
+                                               new 
TupleSerializer<Tuple2<String, Double>>(
+                                                               
(Class<Tuple2<String, Double>>) (Class<?>) Tuple2.class,
+                                                               new 
TypeSerializer[] {
+                                                                               
StringSerializer.INSTANCE,
+                                                                               
DoubleSerializer.INSTANCE }),
+                                               new 
TupleSerializer<Tuple2<Long, Long>>(
+                                                               
(Class<Tuple2<Long, Long>>) (Class<?>) Tuple2.class,
+                                                               new 
TypeSerializer[] {
+                                                                               
LongSerializer.INSTANCE,
+                                                                               
LongSerializer.INSTANCE }),
+                                               new 
TupleSerializer<Tuple2<Integer, Long>>(
+                                                               
(Class<Tuple2<Integer, Long>>) (Class<?>) Tuple2.class,
+                                                               new 
TypeSerializer[] {
+                                                                               
IntSerializer.INSTANCE,
+                                                                               
LongSerializer.INSTANCE }) });
+       }
+
+       @SuppressWarnings("unchecked")
+       @Override
+       protected TupleSerializer<Tuple3<Tuple2<String, Double>, Tuple2<Long, 
Long>, Tuple2<Integer, Long>>> createSerializer() {
+               return new  TupleSerializer<Tuple3<Tuple2<String, Double>, 
Tuple2<Long, Long>, Tuple2<Integer, Long>>>(
+                               (Class<Tuple3<Tuple2<String, Double>, 
Tuple2<Long, Long>, Tuple2<Integer, Long>>>) (Class<?>) Tuple3.class,
+                               new TypeSerializer[]{
+                                       new TupleSerializer<Tuple2<String, 
Double>> (
+                                                       (Class<Tuple2<String, 
Double>>) (Class<?>) Tuple2.class,
+                                                       new TypeSerializer[]{
+                                                                       
StringSerializer.INSTANCE,
+                                                                       
DoubleSerializer.INSTANCE
+                                       }),
+                                       new TupleSerializer<Tuple2<Long, Long>> 
(
+                                                       (Class<Tuple2<Long, 
Long>>) (Class<?>) Tuple2.class,
+                                                       new TypeSerializer[]{
+                                                                       
LongSerializer.INSTANCE,
+                                                                       
LongSerializer.INSTANCE
+                                       }),
+                                       new TupleSerializer<Tuple2<Integer, 
Long>> (
+                                                       (Class<Tuple2<Integer, 
Long>>) (Class<?>) Tuple2.class,
+                                                       new TypeSerializer[]{
+                                                                       
IntSerializer.INSTANCE,
+                                                                       
LongSerializer.INSTANCE
+                                       })
+                               });
+       }
+
+       @Override
+       protected Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, 
Tuple2<Integer, Long>>[] getSortedTestData() {
+               return this.dataISD;
+       }
+       
+       @Override
+       protected void deepEquals(
+                       String message,
+                       Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, 
Tuple2<Integer, Long>> should,
+                       Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, 
Tuple2<Integer, Long>> is) {
+               
+               for (int x = 0; x < should.getArity(); x++) {
+                       // Check whether field is of type Tuple2 because 
assertEquals must be called on the non Tuple2 fields.
+                       if(should.getField(x) instanceof Tuple2) {
+                               this.deepEquals(message, (Tuple2<?,?>) 
should.getField(x), (Tuple2<?,?>)is.getField(x));
+                       }
+                       else {
+                               assertEquals(message, should.getField(x), 
is.getField(x));
+                       }
+               }// For
+       }
+       
+       protected void deepEquals(String message, Tuple2<?,?> should, 
Tuple2<?,?> is) {
+               for (int x = 0; x < should.getArity(); x++) {
+                       assertEquals(message, should.getField(x), 
is.getField(x));
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerTest.java
 
b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerTest.java
new file mode 100644
index 0000000..017eb44
--- /dev/null
+++ 
b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerTest.java
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.api.java.typeutils.runtime;
+
+import java.util.ArrayList;
+import java.util.Random;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.tuple.Tuple0;
+import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple5;
+import 
org.apache.flink.api.java.typeutils.runtime.AbstractGenericTypeSerializerTest.Book;
+import 
org.apache.flink.api.java.typeutils.runtime.AbstractGenericTypeSerializerTest.BookAuthor;
+import 
org.apache.flink.api.java.typeutils.runtime.AbstractGenericTypeSerializerTest.ComplexNestedObject1;
+import 
org.apache.flink.api.java.typeutils.runtime.AbstractGenericTypeSerializerTest.ComplexNestedObject2;
+import 
org.apache.flink.api.java.typeutils.runtime.AbstractGenericTypeSerializerTest.SimpleTypes;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.util.StringUtils;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TupleSerializerTest {
+
+       @Test
+       public void testTuple0() {
+               Tuple0[] testTuples = new Tuple0[] { Tuple0.INSTANCE, 
Tuple0.INSTANCE, Tuple0.INSTANCE };
+
+               runTests(testTuples);
+       }
+
+       @Test
+       public void testTuple1Int() {
+               @SuppressWarnings({"unchecked", "rawtypes"})
+               Tuple1<Integer>[] testTuples = new Tuple1[] {
+                       new Tuple1<Integer>(42), new Tuple1<Integer>(1), new 
Tuple1<Integer>(0), new Tuple1<Integer>(-1),
+                       new Tuple1<Integer>(Integer.MAX_VALUE), new 
Tuple1<Integer>(Integer.MIN_VALUE)
+               };
+               
+               runTests(testTuples);
+       }
+       
+       @Test
+       public void testTuple1String() {
+               Random rnd = new Random(68761564135413L);
+               
+               @SuppressWarnings({"unchecked", "rawtypes"})
+               Tuple1<String>[] testTuples = new Tuple1[] {
+                       new Tuple1<String>(StringUtils.getRandomString(rnd, 10, 
100)),
+                       new Tuple1<String>("abc"),
+                       new Tuple1<String>(""),
+                       new Tuple1<String>(StringUtils.getRandomString(rnd, 30, 
170)),
+                       new Tuple1<String>(StringUtils.getRandomString(rnd, 15, 
50)),
+                       new Tuple1<String>("")
+               };
+               
+               runTests(testTuples);
+       }
+       
+       @Test
+       public void testTuple1StringArray() {
+               Random rnd = new Random(289347567856686223L);
+               
+               String[] arr1 = new String[] {"abc", "",
+                               StringUtils.getRandomString(rnd, 10, 100),
+                               StringUtils.getRandomString(rnd, 15, 50),
+                               StringUtils.getRandomString(rnd, 30, 170),
+                               StringUtils.getRandomString(rnd, 14, 15),
+                               ""};
+               
+               String[] arr2 = new String[] {"foo", "",
+                               StringUtils.getRandomString(rnd, 10, 100),
+                               StringUtils.getRandomString(rnd, 1000, 5000),
+                               StringUtils.getRandomString(rnd, 30000, 35000),
+                               StringUtils.getRandomString(rnd, 100*1024, 
105*1024),
+                               "bar"};
+               
+               @SuppressWarnings("unchecked")
+               Tuple1<String[]>[] testTuples = new Tuple1[] {
+                       new Tuple1<String[]>(arr1),
+                       new Tuple1<String[]>(arr2)
+               };
+               
+               runTests(testTuples);
+       }
+       
+       @Test
+       public void testTuple2StringDouble() {
+               Random rnd = new Random(807346528946L);
+               
+               @SuppressWarnings("unchecked")
+               Tuple2<String, Double>[] testTuples = new Tuple2[] {
+                               new Tuple2<String, 
Double>(StringUtils.getRandomString(rnd, 10, 100), rnd.nextDouble()),
+                               new Tuple2<String, 
Double>(StringUtils.getRandomString(rnd, 10, 100), rnd.nextDouble()),
+                               new Tuple2<String, 
Double>(StringUtils.getRandomString(rnd, 10, 100), rnd.nextDouble()),
+                               new Tuple2<String, Double>("", 
rnd.nextDouble()),
+                               new Tuple2<String, 
Double>(StringUtils.getRandomString(rnd, 10, 100), rnd.nextDouble()),
+                               new Tuple2<String, 
Double>(StringUtils.getRandomString(rnd, 10, 100), rnd.nextDouble())
+                       };
+               
+               runTests(testTuples);
+       }
+       
+       @Test
+       public void testTuple2StringStringArray() {
+               Random rnd = new Random(289347567856686223L);
+               
+               String[] arr1 = new String[] {"abc", "",
+                               StringUtils.getRandomString(rnd, 10, 100),
+                               StringUtils.getRandomString(rnd, 15, 50),
+                               StringUtils.getRandomString(rnd, 30, 170),
+                               StringUtils.getRandomString(rnd, 14, 15),
+                               ""};
+               
+               String[] arr2 = new String[] {"foo", "",
+                               StringUtils.getRandomString(rnd, 10, 100),
+                               StringUtils.getRandomString(rnd, 1000, 5000),
+                               StringUtils.getRandomString(rnd, 30000, 35000),
+                               StringUtils.getRandomString(rnd, 100*1024, 
105*1024),
+                               "bar"};
+               
+               @SuppressWarnings("unchecked")
+               Tuple2<String, String[]>[] testTuples = new Tuple2[] {
+                       new Tuple2<String, 
String[]>(StringUtils.getRandomString(rnd, 30, 170), arr1),
+                       new Tuple2<String, 
String[]>(StringUtils.getRandomString(rnd, 30, 170), arr2),
+                       new Tuple2<String, 
String[]>(StringUtils.getRandomString(rnd, 30, 170), arr1),
+                       new Tuple2<String, 
String[]>(StringUtils.getRandomString(rnd, 30, 170), arr2),
+                       new Tuple2<String, 
String[]>(StringUtils.getRandomString(rnd, 30, 170), arr2)
+               };
+               
+               runTests(testTuples);
+       }
+       
+
+       @Test
+       public void testTuple5CustomObjects() {
+               Random rnd = new Random(807346528946L);
+               
+               SimpleTypes a = new SimpleTypes();
+               SimpleTypes b = new SimpleTypes(rnd.nextInt(), rnd.nextLong(), 
(byte) rnd.nextInt(),
+                               StringUtils.getRandomString(rnd, 10, 100), 
(short) rnd.nextInt(), rnd.nextDouble());
+               SimpleTypes c = new SimpleTypes(rnd.nextInt(), rnd.nextLong(), 
(byte) rnd.nextInt(),
+                               StringUtils.getRandomString(rnd, 10, 100), 
(short) rnd.nextInt(), rnd.nextDouble());
+               SimpleTypes d = new SimpleTypes(rnd.nextInt(), rnd.nextLong(), 
(byte) rnd.nextInt(),
+                               StringUtils.getRandomString(rnd, 10, 100), 
(short) rnd.nextInt(), rnd.nextDouble());
+               SimpleTypes e = new SimpleTypes(rnd.nextInt(), rnd.nextLong(), 
(byte) rnd.nextInt(),
+                               StringUtils.getRandomString(rnd, 10, 100), 
(short) rnd.nextInt(), rnd.nextDouble());
+               SimpleTypes f = new SimpleTypes(rnd.nextInt(), rnd.nextLong(), 
(byte) rnd.nextInt(),
+                               StringUtils.getRandomString(rnd, 10, 100), 
(short) rnd.nextInt(), rnd.nextDouble());
+               SimpleTypes g = new SimpleTypes(rnd.nextInt(), rnd.nextLong(), 
(byte) rnd.nextInt(),
+                               StringUtils.getRandomString(rnd, 10, 100), 
(short) rnd.nextInt(), rnd.nextDouble());
+               
+               ComplexNestedObject1 o1 = new ComplexNestedObject1(5626435);
+               ComplexNestedObject1 o2 = new ComplexNestedObject1(76923);
+               ComplexNestedObject1 o3 = new ComplexNestedObject1(-1100);
+               ComplexNestedObject1 o4 = new ComplexNestedObject1(0);
+               ComplexNestedObject1 o5 = new ComplexNestedObject1(44);
+               
+               ComplexNestedObject2 co1 = new ComplexNestedObject2(rnd);
+               ComplexNestedObject2 co2 = new ComplexNestedObject2();
+               ComplexNestedObject2 co3 = new ComplexNestedObject2(rnd);
+               ComplexNestedObject2 co4 = new ComplexNestedObject2(rnd);
+               
+               Book b1 = new Book(976243875L, "The Serialization Odysse", 42);
+               Book b2 = new Book(0L, "Debugging byte streams", 1337);
+               Book b3 = new Book(-1L, "Low level interfaces", 0xC0FFEE);
+               Book b4 = new Book(Long.MAX_VALUE, "The joy of bits and bytes", 
0xDEADBEEF);
+               Book b5 = new Book(Long.MIN_VALUE, "Winnign a prize for 
creative test strings", 0xBADF00);
+               Book b6 = new Book(-2L, "Distributed Systems", 
0xABCDEF0123456789L);
+               
+               ArrayList<String> list = new ArrayList<String>();
+               list.add("A");
+               list.add("B");
+               list.add("C");
+               list.add("D");
+               list.add("E");
+               
+               BookAuthor ba1 = new BookAuthor(976243875L, list, "Arno Nym");
+               
+               ArrayList<String> list2 = new ArrayList<String>();
+               BookAuthor ba2 = new BookAuthor(987654321L, list2, "The 
Saurus");
+               
+               
+               @SuppressWarnings("unchecked")
+               Tuple5<SimpleTypes, Book, ComplexNestedObject1, BookAuthor, 
ComplexNestedObject2>[] testTuples = new Tuple5[] {
+                               new Tuple5<SimpleTypes, Book, 
ComplexNestedObject1, BookAuthor, ComplexNestedObject2>(a, b1, o1, ba1, co1),
+                               new Tuple5<SimpleTypes, Book, 
ComplexNestedObject1, BookAuthor, ComplexNestedObject2>(b, b2, o2, ba2, co2),
+                               new Tuple5<SimpleTypes, Book, 
ComplexNestedObject1, BookAuthor, ComplexNestedObject2>(c, b3, o3, ba1, co3),
+                               new Tuple5<SimpleTypes, Book, 
ComplexNestedObject1, BookAuthor, ComplexNestedObject2>(d, b2, o4, ba1, co4),
+                               new Tuple5<SimpleTypes, Book, 
ComplexNestedObject1, BookAuthor, ComplexNestedObject2>(e, b4, o5, ba2, co4),
+                               new Tuple5<SimpleTypes, Book, 
ComplexNestedObject1, BookAuthor, ComplexNestedObject2>(f, b5, o1, ba2, co4),
+                               new Tuple5<SimpleTypes, Book, 
ComplexNestedObject1, BookAuthor, ComplexNestedObject2>(g, b6, o4, ba1, co2)
+               };
+               
+               runTests(testTuples);
+       }
+
+       private <T extends Tuple> void runTests(T... instances) {
+               try {
+                       TupleTypeInfo<T> tupleTypeInfo = (TupleTypeInfo<T>) 
TypeExtractor.getForObject(instances[0]);
+                       TypeSerializer<T> serializer = 
tupleTypeInfo.createSerializer(new ExecutionConfig());
+                       
+                       Class<T> tupleClass = tupleTypeInfo.getTypeClass();
+                       
+                       int length = -1;
+                       if(tupleClass == Tuple0.class) {
+                               length = 1;
+                       }
+                       TupleSerializerTestInstance<T> test = new 
TupleSerializerTestInstance<T>(serializer, tupleClass, length, instances);
+                       test.testAll();
+               }
+               catch (Exception e) {
+                       System.err.println(e.getMessage());
+                       e.printStackTrace();
+                       Assert.fail(e.getMessage());
+               }
+       }
+}

Reply via email to