http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSubclassSerializerTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSubclassSerializerTest.java b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSubclassSerializerTest.java new file mode 100644 index 0000000..8c61a19 --- /dev/null +++ b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSubclassSerializerTest.java @@ -0,0 +1,196 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.typeutils.runtime; + +import com.google.common.base.Objects; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.SerializerTestBase; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.junit.Test; + +import java.util.Random; + +/** + * A test for the {@link PojoSerializer}. + */ +public class PojoSubclassSerializerTest extends SerializerTestBase<PojoSubclassSerializerTest.TestUserClassBase> { + private TypeInformation<TestUserClassBase> type = TypeExtractor.getForClass(TestUserClassBase.class); + + @Override + protected TypeSerializer<TestUserClassBase> createSerializer() { + // only register one of the three child classes, the third child class is NO POJO + ExecutionConfig conf = new ExecutionConfig(); + conf.registerPojoType(TestUserClass1.class); + TypeSerializer<TestUserClassBase> serializer = type.createSerializer(conf); + assert(serializer instanceof PojoSerializer); + return serializer; + } + + @Override + protected int getLength() { + return -1; + } + + @Override + protected Class<TestUserClassBase> getTypeClass() { + return TestUserClassBase.class; + } + + @Override + protected TestUserClassBase[] getTestData() { + Random rnd = new Random(874597969123412341L); + + return new TestUserClassBase[]{ + new TestUserClass1(rnd.nextInt(), "foo", rnd.nextLong()), + new TestUserClass2(rnd.nextInt(), "bar", rnd.nextFloat()), + new TestUserClass3(rnd.nextInt(), "bar", rnd.nextFloat()) + }; + + } + + @Override + @Test + public void testInstantiate() { + // don't do anything, since the PojoSerializer with subclass will return null + } + + // User code class for testing the serializer + public static abstract class TestUserClassBase { + public int dumm1; + public String dumm2; + + + public TestUserClassBase() { + } + + public TestUserClassBase(int dumm1, String dumm2) { + this.dumm1 = dumm1; + this.dumm2 = dumm2; + } + + @Override + public int hashCode() { + return Objects.hashCode(dumm1, dumm2); + } + + @Override + public boolean equals(Object other) { + if (!(other instanceof TestUserClassBase)) { + return false; + } + TestUserClassBase otherTUC = (TestUserClassBase) other; + if (dumm1 != otherTUC.dumm1) { + return false; + } + if (!dumm2.equals(otherTUC.dumm2)) { + return false; + } + return true; + } + } + + public static class TestUserClass1 extends TestUserClassBase { + public long dumm3; + + public TestUserClass1() { + } + + public TestUserClass1(int dumm1, String dumm2, long dumm3) { + super(dumm1, dumm2); + this.dumm3 = dumm3; + } + + @Override + public boolean equals(Object other) { + if (!(other instanceof TestUserClass1)) { + return false; + } + TestUserClass1 otherTUC = (TestUserClass1) other; + if (dumm1 != otherTUC.dumm1) { + return false; + } + if (!dumm2.equals(otherTUC.dumm2)) { + return false; + } + if (dumm3 != otherTUC.dumm3) { + return false; + } + return true; + } + } + + public static class TestUserClass2 extends TestUserClassBase { + public float dumm4; + + public TestUserClass2() { + } + + public TestUserClass2(int dumm1, String dumm2, float dumm4) { + super(dumm1, dumm2); + this.dumm4 = dumm4; + } + + @Override + public boolean equals(Object other) { + if (!(other instanceof TestUserClass2)) { + return false; + } + TestUserClass2 otherTUC = (TestUserClass2) other; + if (dumm1 != otherTUC.dumm1) { + return false; + } + if (!dumm2.equals(otherTUC.dumm2)) { + return false; + } + if (dumm4 != otherTUC.dumm4) { + return false; + } + return true; + } + } + + public static class TestUserClass3 extends TestUserClassBase { + public float dumm4; + + public TestUserClass3(int dumm1, String dumm2, float dumm4) { + super(dumm1, dumm2); + this.dumm4 = dumm4; + } + + @Override + public boolean equals(Object other) { + if (!(other instanceof TestUserClass3)) { + return false; + } + TestUserClass3 otherTUC = (TestUserClass3) other; + if (dumm1 != otherTUC.dumm1) { + return false; + } + if (!dumm2.equals(otherTUC.dumm2)) { + return false; + } + if (dumm4 != otherTUC.dumm4) { + return false; + } + return true; + } + } +}
http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/StringArrayWritable.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/StringArrayWritable.java b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/StringArrayWritable.java new file mode 100644 index 0000000..7c608f2 --- /dev/null +++ b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/StringArrayWritable.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.typeutils.runtime; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +import org.apache.hadoop.io.Writable; + +public class StringArrayWritable implements Writable, Comparable<StringArrayWritable> { + + private String[] array = new String[0]; + + public StringArrayWritable() { + super(); + } + + public StringArrayWritable(String[] array) { + this.array = array; + } + + @Override + public void write(DataOutput out) throws IOException { + out.writeInt(this.array.length); + + for(String str : this.array) { + byte[] b = str.getBytes(); + out.writeInt(b.length); + out.write(b); + } + } + + @Override + public void readFields(DataInput in) throws IOException { + this.array = new String[in.readInt()]; + + for(int i = 0; i < this.array.length; i++) { + byte[] b = new byte[in.readInt()]; + in.readFully(b); + this.array[i] = new String(b); + } + } + + @Override + public int compareTo(StringArrayWritable o) { + if(this.array.length != o.array.length) { + return this.array.length - o.array.length; + } + + for(int i = 0; i < this.array.length; i++) { + int comp = this.array[i].compareTo(o.array[i]); + if(comp != 0) { + return comp; + } + } + return 0; + } + + @Override + public boolean equals(Object obj) { + if(!(obj instanceof StringArrayWritable)) { + return false; + } + return this.compareTo((StringArrayWritable) obj) == 0; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/SubclassFromInterfaceSerializerTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/SubclassFromInterfaceSerializerTest.java b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/SubclassFromInterfaceSerializerTest.java new file mode 100644 index 0000000..b797090 --- /dev/null +++ b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/SubclassFromInterfaceSerializerTest.java @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.typeutils.runtime; + +import com.google.common.base.Objects; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.SerializerTestBase; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer; +import org.junit.Test; + +import java.util.Random; + +/** + * Testing the serialization of classes which are subclasses of a class that implements an interface. + */ +public class SubclassFromInterfaceSerializerTest extends SerializerTestBase<SubclassFromInterfaceSerializerTest.TestUserInterface> { + private TypeInformation<TestUserInterface> type = TypeExtractor.getForClass(TestUserInterface.class); + + @Override + protected TypeSerializer<TestUserInterface> createSerializer() { + // only register one of the two child classes + ExecutionConfig conf = new ExecutionConfig(); + conf.registerPojoType(TestUserClass2.class); + TypeSerializer<TestUserInterface> serializer = type.createSerializer(conf); + assert(serializer instanceof KryoSerializer); + return serializer; + } + + @Override + protected int getLength() { + return -1; + } + + @Override + protected Class<TestUserInterface> getTypeClass() { + return TestUserInterface.class; + } + + @Override + protected TestUserInterface[] getTestData() { + Random rnd = new Random(874597969123412341L); + + return new TestUserInterface[]{ + new TestUserClass1(rnd.nextInt(), "foo", rnd.nextLong()), + new TestUserClass2(rnd.nextInt(), "bar", rnd.nextFloat()) + }; + + } + + @Override + @Test + public void testInstantiate() { + // don't do anything, since the PojoSerializer with subclass will return null + } + + public interface TestUserInterface {} + + // User code class for testing the serializer + public static class TestUserClassBase implements TestUserInterface { + public int dumm1; + public String dumm2; + + + public TestUserClassBase() { + } + + public TestUserClassBase(int dumm1, String dumm2) { + this.dumm1 = dumm1; + this.dumm2 = dumm2; + } + + @Override + public int hashCode() { + return Objects.hashCode(dumm1, dumm2); + } + + @Override + public boolean equals(Object other) { + if (!(other instanceof TestUserClassBase)) { + return false; + } + TestUserClassBase otherTUC = (TestUserClassBase) other; + if (dumm1 != otherTUC.dumm1) { + return false; + } + if (!dumm2.equals(otherTUC.dumm2)) { + return false; + } + return true; + } + } + + public static class TestUserClass1 extends TestUserClassBase { + public long dumm3; + + public TestUserClass1() { + } + + public TestUserClass1(int dumm1, String dumm2, long dumm3) { + super(dumm1, dumm2); + this.dumm3 = dumm3; + } + + @Override + public boolean equals(Object other) { + if (!(other instanceof TestUserClass1)) { + return false; + } + TestUserClass1 otherTUC = (TestUserClass1) other; + if (dumm1 != otherTUC.dumm1) { + return false; + } + if (!dumm2.equals(otherTUC.dumm2)) { + return false; + } + if (dumm3 != otherTUC.dumm3) { + return false; + } + return true; + } + } + + public static class TestUserClass2 extends TestUserClassBase { + public float dumm4; + + public TestUserClass2() { + } + + public TestUserClass2(int dumm1, String dumm2, float dumm4) { + super(dumm1, dumm2); + this.dumm4 = dumm4; + } + + @Override + public boolean equals(Object other) { + if (!(other instanceof TestUserClass2)) { + return false; + } + TestUserClass2 otherTUC = (TestUserClass2) other; + if (dumm1 != otherTUC.dumm1) { + return false; + } + if (!dumm2.equals(otherTUC.dumm2)) { + return false; + } + if (dumm4 != otherTUC.dumm4) { + return false; + } + return true; + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/TestDataOutputSerializer.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/TestDataOutputSerializer.java b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/TestDataOutputSerializer.java new file mode 100644 index 0000000..87be6db --- /dev/null +++ b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/TestDataOutputSerializer.java @@ -0,0 +1,308 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.typeutils.runtime; + +import java.io.EOFException; +import java.io.IOException; +import java.io.UTFDataFormatException; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; + +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.core.memory.MemoryUtils; + +public final class TestDataOutputSerializer implements DataOutputView { + + private byte[] buffer; + + private int position; + + private ByteBuffer wrapper; + + private final int maxSize; + + + public TestDataOutputSerializer(int startSize) { + this(startSize, Integer.MAX_VALUE); + } + + public TestDataOutputSerializer(int startSize, int maxSize) { + if (startSize < 1 || startSize > maxSize) { + throw new IllegalArgumentException(); + } + + this.buffer = new byte[startSize]; + this.wrapper = ByteBuffer.wrap(buffer); + this.maxSize = maxSize; + } + + public ByteBuffer wrapAsByteBuffer() { + this.wrapper.position(0); + this.wrapper.limit(this.position); + return this.wrapper; + } + + public byte[] copyByteBuffer() { + byte[] target = new byte[position]; + System.arraycopy(buffer, 0, target, 0, position); + + return target; + } + + public void clear() { + this.position = 0; + } + + public int length() { + return this.position; + } + + @Override + public String toString() { + return String.format("[pos=%d cap=%d]", this.position, this.buffer.length); + } + + // ---------------------------------------------------------------------------------------- + // Data Output + // ---------------------------------------------------------------------------------------- + + @Override + public void write(int b) throws IOException { + if (this.position >= this.buffer.length) { + resize(1); + } + this.buffer[this.position++] = (byte) (b & 0xff); + } + + @Override + public void write(byte[] b) throws IOException { + write(b, 0, b.length); + } + + @Override + public void write(byte[] b, int off, int len) throws IOException { + if (len < 0 || off > b.length - len) { + throw new ArrayIndexOutOfBoundsException(); + } + if (this.position > this.buffer.length - len) { + resize(len); + } + System.arraycopy(b, off, this.buffer, this.position, len); + this.position += len; + } + + @Override + public void writeBoolean(boolean v) throws IOException { + write(v ? 1 : 0); + } + + @Override + public void writeByte(int v) throws IOException { + write(v); + } + + @Override + public void writeBytes(String s) throws IOException { + final int sLen = s.length(); + if (this.position >= this.buffer.length - sLen) { + resize(sLen); + } + + for (int i = 0; i < sLen; i++) { + writeByte(s.charAt(i)); + } + this.position += sLen; + } + + @Override + public void writeChar(int v) throws IOException { + if (this.position >= this.buffer.length - 1) { + resize(2); + } + this.buffer[this.position++] = (byte) (v >> 8); + this.buffer[this.position++] = (byte) v; + } + + @Override + public void writeChars(String s) throws IOException { + final int sLen = s.length(); + if (this.position >= this.buffer.length - 2*sLen) { + resize(2*sLen); + } + for (int i = 0; i < sLen; i++) { + writeChar(s.charAt(i)); + } + } + + @Override + public void writeDouble(double v) throws IOException { + writeLong(Double.doubleToLongBits(v)); + } + + @Override + public void writeFloat(float v) throws IOException { + writeInt(Float.floatToIntBits(v)); + } + + @SuppressWarnings("restriction") + @Override + public void writeInt(int v) throws IOException { + if (this.position >= this.buffer.length - 3) { + resize(4); + } + if (LITTLE_ENDIAN) { + v = Integer.reverseBytes(v); + } + UNSAFE.putInt(this.buffer, BASE_OFFSET + this.position, v); + this.position += 4; + } + + @SuppressWarnings("restriction") + @Override + public void writeLong(long v) throws IOException { + if (this.position >= this.buffer.length - 7) { + resize(8); + } + if (LITTLE_ENDIAN) { + v = Long.reverseBytes(v); + } + UNSAFE.putLong(this.buffer, BASE_OFFSET + this.position, v); + this.position += 8; + } + + @Override + public void writeShort(int v) throws IOException { + if (this.position >= this.buffer.length - 1) { + resize(2); + } + this.buffer[this.position++] = (byte) ((v >>> 8) & 0xff); + this.buffer[this.position++] = (byte) ((v >>> 0) & 0xff); + } + + @Override + public void writeUTF(String str) throws IOException { + int strlen = str.length(); + int utflen = 0; + int c; + + /* use charAt instead of copying String to char array */ + for (int i = 0; i < strlen; i++) { + c = str.charAt(i); + if ((c >= 0x0001) && (c <= 0x007F)) { + utflen++; + } else if (c > 0x07FF) { + utflen += 3; + } else { + utflen += 2; + } + } + + if (utflen > 65535) { + throw new UTFDataFormatException("Encoded string is too long: " + utflen); + } + else if (this.position > this.buffer.length - utflen - 2) { + resize(utflen + 2); + } + + byte[] bytearr = this.buffer; + int count = this.position; + + bytearr[count++] = (byte) ((utflen >>> 8) & 0xFF); + bytearr[count++] = (byte) ((utflen >>> 0) & 0xFF); + + int i = 0; + for (i = 0; i < strlen; i++) { + c = str.charAt(i); + if (!((c >= 0x0001) && (c <= 0x007F))) { + break; + } + bytearr[count++] = (byte) c; + } + + for (; i < strlen; i++) { + c = str.charAt(i); + if ((c >= 0x0001) && (c <= 0x007F)) { + bytearr[count++] = (byte) c; + + } else if (c > 0x07FF) { + bytearr[count++] = (byte) (0xE0 | ((c >> 12) & 0x0F)); + bytearr[count++] = (byte) (0x80 | ((c >> 6) & 0x3F)); + bytearr[count++] = (byte) (0x80 | ((c >> 0) & 0x3F)); + } else { + bytearr[count++] = (byte) (0xC0 | ((c >> 6) & 0x1F)); + bytearr[count++] = (byte) (0x80 | ((c >> 0) & 0x3F)); + } + } + + this.position = count; + } + + + private void resize(int minCapacityAdd) throws IOException { + try { + int newLen = Math.max(this.buffer.length * 2, this.buffer.length + minCapacityAdd); + + if (newLen > maxSize) { + + if (this.buffer.length + minCapacityAdd > maxSize) { + throw new EOFException("Exceeded maximum capacity"); + } + + newLen = maxSize; + } + + final byte[] nb = new byte[newLen]; + System.arraycopy(this.buffer, 0, nb, 0, this.position); + this.buffer = nb; + this.wrapper = ByteBuffer.wrap(this.buffer); + } + catch (NegativeArraySizeException nasex) { + throw new IOException("Serialization failed because the record length would exceed 2GB (max addressable array size in Java)."); + } + } + + @SuppressWarnings("restriction") + private static final sun.misc.Unsafe UNSAFE = MemoryUtils.UNSAFE; + + @SuppressWarnings("restriction") + private static final long BASE_OFFSET = UNSAFE.arrayBaseOffset(byte[].class); + + private static final boolean LITTLE_ENDIAN = (MemoryUtils.NATIVE_BYTE_ORDER == ByteOrder.LITTLE_ENDIAN); + + @Override + public void skipBytesToWrite(int numBytes) throws IOException { + if(buffer.length - this.position < numBytes){ + throw new EOFException("Could not skip " + numBytes + " bytes."); + } + + this.position += numBytes; + } + + @Override + public void write(DataInputView source, int numBytes) throws IOException { + if(buffer.length - this.position < numBytes){ + throw new EOFException("Could not write " + numBytes + " bytes. Buffer overflow."); + } + + source.read(this.buffer, this.position, numBytes); + this.position += numBytes; + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorILD2Test.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorILD2Test.java b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorILD2Test.java new file mode 100644 index 0000000..cfc4914 --- /dev/null +++ b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorILD2Test.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.typeutils.runtime; + +import org.apache.flink.api.common.typeutils.TypeComparator; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.base.DoubleSerializer; +import org.apache.flink.api.common.typeutils.base.IntComparator; +import org.apache.flink.api.common.typeutils.base.IntSerializer; +import org.apache.flink.api.common.typeutils.base.LongComparator; +import org.apache.flink.api.common.typeutils.base.LongSerializer; +import org.apache.flink.api.java.tuple.Tuple3; + +import org.apache.flink.api.java.typeutils.runtime.tuple.base.TupleComparatorTestBase; + +public class TupleComparatorILD2Test extends TupleComparatorTestBase<Tuple3<Integer, Long, Double>> { + + @SuppressWarnings("unchecked") + Tuple3<Integer, Long, Double>[] dataISD = new Tuple3[]{ + new Tuple3<Integer, Long, Double>(4, 14L, 20.0), + new Tuple3<Integer, Long, Double>(4, 15L, 23.2), + new Tuple3<Integer, Long, Double>(5, 15L, 20.0), + new Tuple3<Integer, Long, Double>(5, 20L, 20.0), + new Tuple3<Integer, Long, Double>(6, 20L, 23.2), + new Tuple3<Integer, Long, Double>(6, 29L, 20.0), + new Tuple3<Integer, Long, Double>(7, 29L, 20.0), + new Tuple3<Integer, Long, Double>(7, 34L, 23.2) + }; + + @Override + protected TupleComparator<Tuple3<Integer, Long, Double>> createComparator(boolean ascending) { + return new TupleComparator<Tuple3<Integer, Long, Double>>( + new int[]{0, 1}, + new TypeComparator[]{ + new IntComparator(ascending), + new LongComparator(ascending) + }, + new TypeSerializer[]{ IntSerializer.INSTANCE, LongSerializer.INSTANCE }); + } + + @SuppressWarnings("unchecked") + @Override + protected TupleSerializer<Tuple3<Integer, Long, Double>> createSerializer() { + return new TupleSerializer<Tuple3<Integer, Long, Double>>( + (Class<Tuple3<Integer, Long, Double>>) (Class<?>) Tuple3.class, + new TypeSerializer[]{ + new IntSerializer(), + new LongSerializer(), + new DoubleSerializer()}); + } + + @Override + protected Tuple3<Integer, Long, Double>[] getSortedTestData() { + return dataISD; + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorILD3Test.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorILD3Test.java b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorILD3Test.java new file mode 100644 index 0000000..e5a0e6c --- /dev/null +++ b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorILD3Test.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.typeutils.runtime; + +import org.apache.flink.api.common.typeutils.TypeComparator; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.base.DoubleComparator; +import org.apache.flink.api.common.typeutils.base.DoubleSerializer; +import org.apache.flink.api.common.typeutils.base.IntComparator; +import org.apache.flink.api.common.typeutils.base.IntSerializer; +import org.apache.flink.api.common.typeutils.base.LongComparator; +import org.apache.flink.api.common.typeutils.base.LongSerializer; +import org.apache.flink.api.java.tuple.Tuple3; + +import org.apache.flink.api.java.typeutils.runtime.tuple.base.TupleComparatorTestBase; + +public class TupleComparatorILD3Test extends TupleComparatorTestBase<Tuple3<Integer, Long, Double>> { + + @SuppressWarnings("unchecked") + Tuple3<Integer, Long, Double>[] dataISD = new Tuple3[]{ + new Tuple3<Integer, Long, Double>(4, Long.valueOf(4), 20.0), + new Tuple3<Integer, Long, Double>(4, Long.valueOf(4), 23.2), + new Tuple3<Integer, Long, Double>(4, Long.valueOf(9), 20.0), + new Tuple3<Integer, Long, Double>(5, Long.valueOf(4), 20.0), + new Tuple3<Integer, Long, Double>(5, Long.valueOf(4), 23.2), + new Tuple3<Integer, Long, Double>(5, Long.valueOf(9), 20.0), + new Tuple3<Integer, Long, Double>(6, Long.valueOf(4), 20.0), + new Tuple3<Integer, Long, Double>(6, Long.valueOf(4), 23.2) + }; + + @Override + protected TupleComparator<Tuple3<Integer, Long, Double>> createComparator(boolean ascending) { + return new TupleComparator<Tuple3<Integer, Long, Double>>( + new int[]{0, 1, 2}, + new TypeComparator[]{ + new IntComparator(ascending), + new LongComparator(ascending), + new DoubleComparator(ascending) + }, + new TypeSerializer[]{ IntSerializer.INSTANCE, LongSerializer.INSTANCE, DoubleSerializer.INSTANCE }); + } + + @SuppressWarnings("unchecked") + @Override + protected TupleSerializer<Tuple3<Integer, Long, Double>> createSerializer() { + return new TupleSerializer<Tuple3<Integer, Long, Double>>( + (Class<Tuple3<Integer, Long, Double>>) (Class<?>) Tuple3.class, + new TypeSerializer[]{ + new IntSerializer(), + new LongSerializer(), + new DoubleSerializer()}); + } + + @Override + protected Tuple3<Integer, Long, Double>[] getSortedTestData() { + return dataISD; + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorILDC3Test.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorILDC3Test.java b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorILDC3Test.java new file mode 100644 index 0000000..a1e6c40 --- /dev/null +++ b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorILDC3Test.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.typeutils.runtime; + +import org.apache.flink.api.common.typeutils.TypeComparator; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.base.DoubleComparator; +import org.apache.flink.api.common.typeutils.base.DoubleSerializer; +import org.apache.flink.api.common.typeutils.base.IntComparator; +import org.apache.flink.api.common.typeutils.base.IntSerializer; +import org.apache.flink.api.common.typeutils.base.LongComparator; +import org.apache.flink.api.common.typeutils.base.LongSerializer; +import org.apache.flink.api.java.tuple.Tuple3; + +import org.apache.flink.api.java.typeutils.runtime.tuple.base.TupleComparatorTestBase; + +public class TupleComparatorILDC3Test extends TupleComparatorTestBase<Tuple3<Integer, Long, Double>> { + + @SuppressWarnings("unchecked") + Tuple3<Integer, Long, Double>[] dataISD = new Tuple3[]{ + new Tuple3<Integer, Long, Double>(4, Long.valueOf(4), 20.0), + new Tuple3<Integer, Long, Double>(5, Long.valueOf(1), 20.0), + new Tuple3<Integer, Long, Double>(5, Long.valueOf(2), 20.0), + new Tuple3<Integer, Long, Double>(5, Long.valueOf(10), 23.0), + new Tuple3<Integer, Long, Double>(5, Long.valueOf(19), 24.0), + new Tuple3<Integer, Long, Double>(5, Long.valueOf(20), 24.0), + new Tuple3<Integer, Long, Double>(5, Long.valueOf(24), 25.0), + new Tuple3<Integer, Long, Double>(5, Long.valueOf(25), 25.0) + }; + + @Override + protected TupleComparator<Tuple3<Integer, Long, Double>> createComparator(boolean ascending) { + return new TupleComparator<Tuple3<Integer, Long, Double>>( + new int[]{2, 0, 1}, + new TypeComparator[]{ + new DoubleComparator(ascending), + new IntComparator(ascending), + new LongComparator(ascending) + }, + new TypeSerializer[]{ IntSerializer.INSTANCE, LongSerializer.INSTANCE, DoubleSerializer.INSTANCE }); + } + + @SuppressWarnings("unchecked") + @Override + protected TupleSerializer<Tuple3<Integer, Long, Double>> createSerializer() { + return new TupleSerializer<Tuple3<Integer, Long, Double>>( + (Class<Tuple3<Integer, Long, Double>>) (Class<?>) Tuple3.class, + new TypeSerializer[]{ + new IntSerializer(), + new LongSerializer(), + new DoubleSerializer()}); + } + + @Override + protected Tuple3<Integer, Long, Double>[] getSortedTestData() { + return dataISD; + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorILDX1Test.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorILDX1Test.java b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorILDX1Test.java new file mode 100644 index 0000000..b5c0c1f --- /dev/null +++ b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorILDX1Test.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.typeutils.runtime; + +import org.apache.flink.api.common.typeutils.TypeComparator; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.base.DoubleSerializer; +import org.apache.flink.api.common.typeutils.base.IntSerializer; +import org.apache.flink.api.common.typeutils.base.LongComparator; +import org.apache.flink.api.common.typeutils.base.LongSerializer; +import org.apache.flink.api.java.tuple.Tuple3; + +import org.apache.flink.api.java.typeutils.runtime.tuple.base.TupleComparatorTestBase; + +public class TupleComparatorILDX1Test extends TupleComparatorTestBase<Tuple3<Integer, Long, Double>> { + + @SuppressWarnings("unchecked") + Tuple3<Integer, Long, Double>[] dataISD = new Tuple3[]{ + new Tuple3<Integer, Long, Double>(4, Long.valueOf(4), 20.0), + new Tuple3<Integer, Long, Double>(4, Long.valueOf(5), 23.2), + new Tuple3<Integer, Long, Double>(4, Long.valueOf(9), 20.0), + new Tuple3<Integer, Long, Double>(4, Long.valueOf(10), 24.0), + new Tuple3<Integer, Long, Double>(4, Long.valueOf(19), 23.2), + new Tuple3<Integer, Long, Double>(4, Long.valueOf(20), 24.0), + new Tuple3<Integer, Long, Double>(4, Long.valueOf(24), 20.0), + new Tuple3<Integer, Long, Double>(4, Long.valueOf(25), 23.2) + }; + + @Override + protected TupleComparator<Tuple3<Integer, Long, Double>> createComparator(boolean ascending) { + return new TupleComparator<Tuple3<Integer, Long, Double>>( + new int[]{1}, + new TypeComparator[]{ + new LongComparator(ascending) + }, + new TypeSerializer[]{ IntSerializer.INSTANCE, LongSerializer.INSTANCE }); + } + + @SuppressWarnings("unchecked") + @Override + protected TupleSerializer<Tuple3<Integer, Long, Double>> createSerializer() { + return new TupleSerializer<Tuple3<Integer, Long, Double>>( + (Class<Tuple3<Integer, Long, Double>>) (Class<?>) Tuple3.class, + new TypeSerializer[]{ + new IntSerializer(), + new LongSerializer(), + new DoubleSerializer()}); + } + + @Override + protected Tuple3<Integer, Long, Double>[] getSortedTestData() { + return dataISD; + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorILDXC2Test.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorILDXC2Test.java b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorILDXC2Test.java new file mode 100644 index 0000000..793a2f4 --- /dev/null +++ b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorILDXC2Test.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.typeutils.runtime; + +import org.apache.flink.api.common.typeutils.TypeComparator; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.base.DoubleComparator; +import org.apache.flink.api.common.typeutils.base.DoubleSerializer; +import org.apache.flink.api.common.typeutils.base.IntSerializer; +import org.apache.flink.api.common.typeutils.base.LongComparator; +import org.apache.flink.api.common.typeutils.base.LongSerializer; +import org.apache.flink.api.java.tuple.Tuple3; + +import org.apache.flink.api.java.typeutils.runtime.tuple.base.TupleComparatorTestBase; + +public class TupleComparatorILDXC2Test extends TupleComparatorTestBase<Tuple3<Integer, Long, Double>> { + + @SuppressWarnings("unchecked") + Tuple3<Integer, Long, Double>[] dataISD = new Tuple3[]{ + new Tuple3<Integer, Long, Double>(4, 4L, 20.0), + new Tuple3<Integer, Long, Double>(4, 5L, 20.0), + new Tuple3<Integer, Long, Double>(4, 3L, 23.0), + new Tuple3<Integer, Long, Double>(4, 19L, 23.0), + new Tuple3<Integer, Long, Double>(4, 17L, 24.0), + new Tuple3<Integer, Long, Double>(4, 18L, 24.0), + new Tuple3<Integer, Long, Double>(4, 24L, 25.0), + new Tuple3<Integer, Long, Double>(4, 25L, 25.0) + }; + + @Override + protected TupleComparator<Tuple3<Integer, Long, Double>> createComparator(boolean ascending) { + return new TupleComparator<Tuple3<Integer, Long, Double>>( + new int[]{2, 1}, + new TypeComparator[]{ + new DoubleComparator(ascending), + new LongComparator(ascending) + }, + new TypeSerializer[]{ IntSerializer.INSTANCE, DoubleSerializer.INSTANCE, LongSerializer.INSTANCE }); + } + + @SuppressWarnings("unchecked") + @Override + protected TupleSerializer<Tuple3<Integer, Long, Double>> createSerializer() { + return new TupleSerializer<Tuple3<Integer, Long, Double>>( + (Class<Tuple3<Integer, Long, Double>>) (Class<?>) Tuple3.class, + new TypeSerializer[]{ + new IntSerializer(), + new LongSerializer(), + new DoubleSerializer()}); + } + + @Override + protected Tuple3<Integer, Long, Double>[] getSortedTestData() { + return dataISD; + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorISD1Test.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorISD1Test.java b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorISD1Test.java new file mode 100644 index 0000000..8cdee9b --- /dev/null +++ b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorISD1Test.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.typeutils.runtime; + +import org.apache.flink.api.common.typeutils.TypeComparator; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.base.DoubleSerializer; +import org.apache.flink.api.common.typeutils.base.IntComparator; +import org.apache.flink.api.common.typeutils.base.IntSerializer; +import org.apache.flink.api.common.typeutils.base.StringSerializer; +import org.apache.flink.api.java.tuple.Tuple3; + +import org.apache.flink.api.java.typeutils.runtime.tuple.base.TupleComparatorTestBase; + +public class TupleComparatorISD1Test extends TupleComparatorTestBase<Tuple3<Integer, String, Double>> { + + @SuppressWarnings("unchecked") + Tuple3<Integer, String, Double>[] dataISD = new Tuple3[]{ + new Tuple3<Integer, String, Double>(4, "hello", 20.0), + new Tuple3<Integer, String, Double>(5, "hello", 23.2), + new Tuple3<Integer, String, Double>(6, "world", 20.0), + new Tuple3<Integer, String, Double>(7, "hello", 20.0), + new Tuple3<Integer, String, Double>(8, "hello", 23.2), + new Tuple3<Integer, String, Double>(9, "world", 20.0), + new Tuple3<Integer, String, Double>(10, "hello", 20.0), + new Tuple3<Integer, String, Double>(11, "hello", 23.2) + }; + + @Override + protected TupleComparator<Tuple3<Integer, String, Double>> createComparator(boolean ascending) { + return new TupleComparator<Tuple3<Integer, String, Double>>( + new int[]{0}, + new TypeComparator[]{ new IntComparator(ascending) }, + new TypeSerializer[]{ IntSerializer.INSTANCE }); + } + + @SuppressWarnings("unchecked") + @Override + protected TupleSerializer<Tuple3<Integer, String, Double>> createSerializer() { + return new TupleSerializer<Tuple3<Integer, String, Double>>( + (Class<Tuple3<Integer, String, Double>>) (Class<?>) Tuple3.class, + new TypeSerializer[]{ + new IntSerializer(), + new StringSerializer(), + new DoubleSerializer()}); + } + + @Override + protected Tuple3<Integer, String, Double>[] getSortedTestData() { + return dataISD; + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorISD2Test.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorISD2Test.java b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorISD2Test.java new file mode 100644 index 0000000..06c292f --- /dev/null +++ b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorISD2Test.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.typeutils.runtime; + +import org.apache.flink.api.common.typeutils.TypeComparator; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.base.DoubleSerializer; +import org.apache.flink.api.common.typeutils.base.IntComparator; +import org.apache.flink.api.common.typeutils.base.IntSerializer; +import org.apache.flink.api.common.typeutils.base.StringComparator; +import org.apache.flink.api.common.typeutils.base.StringSerializer; +import org.apache.flink.api.java.tuple.Tuple3; + +import org.apache.flink.api.java.typeutils.runtime.tuple.base.TupleComparatorTestBase; + +public class TupleComparatorISD2Test extends TupleComparatorTestBase<Tuple3<Integer, String, Double>> { + + @SuppressWarnings("unchecked") + Tuple3<Integer, String, Double>[] dataISD = new Tuple3[]{ + new Tuple3<Integer, String, Double>(4, "hello", 20.0), + new Tuple3<Integer, String, Double>(4, "world", 23.2), + new Tuple3<Integer, String, Double>(5, "hello", 20.0), + new Tuple3<Integer, String, Double>(5, "world", 20.0), + new Tuple3<Integer, String, Double>(6, "hello", 23.2), + new Tuple3<Integer, String, Double>(6, "world", 20.0), + new Tuple3<Integer, String, Double>(7, "hello", 20.0), + new Tuple3<Integer, String, Double>(7, "world", 23.2) + }; + + @Override + protected TupleComparator<Tuple3<Integer, String, Double>> createComparator(boolean ascending) { + return new TupleComparator<Tuple3<Integer, String, Double>>( + new int[]{0, 1}, + new TypeComparator[]{ + new IntComparator(ascending), + new StringComparator(ascending) + }, + new TypeSerializer[]{ IntSerializer.INSTANCE, StringSerializer.INSTANCE, DoubleSerializer.INSTANCE }); + } + + @SuppressWarnings("unchecked") + @Override + protected TupleSerializer<Tuple3<Integer, String, Double>> createSerializer() { + return new TupleSerializer<Tuple3<Integer, String, Double>>( + (Class<Tuple3<Integer, String, Double>>) (Class<?>) Tuple3.class, + new TypeSerializer[]{ + new IntSerializer(), + new StringSerializer(), + new DoubleSerializer()}); + } + + @Override + protected Tuple3<Integer, String, Double>[] getSortedTestData() { + return dataISD; + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorISD3Test.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorISD3Test.java b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorISD3Test.java new file mode 100644 index 0000000..d823a29 --- /dev/null +++ b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorISD3Test.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.typeutils.runtime; + +import org.apache.flink.api.common.typeutils.TypeComparator; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.base.DoubleComparator; +import org.apache.flink.api.common.typeutils.base.DoubleSerializer; +import org.apache.flink.api.common.typeutils.base.IntComparator; +import org.apache.flink.api.common.typeutils.base.IntSerializer; +import org.apache.flink.api.common.typeutils.base.StringComparator; +import org.apache.flink.api.common.typeutils.base.StringSerializer; +import org.apache.flink.api.java.tuple.Tuple3; + +import org.apache.flink.api.java.typeutils.runtime.tuple.base.TupleComparatorTestBase; + +public class TupleComparatorISD3Test extends TupleComparatorTestBase<Tuple3<Integer, String, Double>> { + + @SuppressWarnings("unchecked") + Tuple3<Integer, String, Double>[] dataISD = new Tuple3[]{ + new Tuple3<Integer, String, Double>(4, "hello", 20.0), + new Tuple3<Integer, String, Double>(4, "hello", 23.2), + new Tuple3<Integer, String, Double>(4, "world", 20.0), + new Tuple3<Integer, String, Double>(5, "hello", 20.0), + new Tuple3<Integer, String, Double>(5, "hello", 23.2), + new Tuple3<Integer, String, Double>(5, "world", 20.0), + new Tuple3<Integer, String, Double>(6, "hello", 20.0), + new Tuple3<Integer, String, Double>(6, "hello", 23.2) + }; + + @Override + protected TupleComparator<Tuple3<Integer, String, Double>> createComparator(boolean ascending) { + return new TupleComparator<Tuple3<Integer, String, Double>>( + new int[]{0, 1, 2}, + new TypeComparator[]{ + new IntComparator(ascending), + new StringComparator(ascending), + new DoubleComparator(ascending) + }, + new TypeSerializer[]{ IntSerializer.INSTANCE, StringSerializer.INSTANCE, DoubleSerializer.INSTANCE }); + } + + @SuppressWarnings("unchecked") + @Override + protected TupleSerializer<Tuple3<Integer, String, Double>> createSerializer() { + return new TupleSerializer<Tuple3<Integer, String, Double>>( + (Class<Tuple3<Integer, String, Double>>) (Class<?>) Tuple3.class, + new TypeSerializer[]{ + new IntSerializer(), + new StringSerializer(), + new DoubleSerializer()}); + } + + @Override + protected Tuple3<Integer, String, Double>[] getSortedTestData() { + return dataISD; + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorTTT1Test.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorTTT1Test.java b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorTTT1Test.java new file mode 100644 index 0000000..cf73be2 --- /dev/null +++ b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorTTT1Test.java @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.java.typeutils.runtime; + +import static org.junit.Assert.assertEquals; +import org.apache.flink.api.common.typeutils.TypeComparator; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.base.DoubleComparator; +import org.apache.flink.api.common.typeutils.base.DoubleSerializer; +import org.apache.flink.api.common.typeutils.base.IntComparator; +import org.apache.flink.api.common.typeutils.base.IntSerializer; +import org.apache.flink.api.common.typeutils.base.LongComparator; +import org.apache.flink.api.common.typeutils.base.LongSerializer; +import org.apache.flink.api.common.typeutils.base.StringComparator; +import org.apache.flink.api.common.typeutils.base.StringSerializer; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.api.java.typeutils.runtime.tuple.base.TupleComparatorTestBase; + +public class TupleComparatorTTT1Test extends TupleComparatorTestBase<Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer, Long>>> { + + @SuppressWarnings("unchecked") + Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer, Long>>[] dataISD = new Tuple3[]{ + new Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer, Long>>(new Tuple2<String, Double>("hello", 1.0), new Tuple2<Long, Long>(1L, 1L), new Tuple2<Integer, Long>(4, -10L)), + new Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer, Long>>(new Tuple2<String, Double>("hello", 2.0), new Tuple2<Long, Long>(1L, 2L), new Tuple2<Integer, Long>(4, -5L)), + new Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer, Long>>(new Tuple2<String, Double>("hello", 3.0), new Tuple2<Long, Long>(1L, 3L), new Tuple2<Integer, Long>(4, 0L)), + new Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer, Long>>(new Tuple2<String, Double>("hello", 3.5), new Tuple2<Long, Long>(1L, 4L), new Tuple2<Integer, Long>(4, 5L)), + new Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer, Long>>(new Tuple2<String, Double>("hello", 4325.12), new Tuple2<Long, Long>(1L, 5L), new Tuple2<Integer, Long>(4, 15L)), + new Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer, Long>>(new Tuple2<String, Double>("world", 1.0), new Tuple2<Long, Long>(2L, 4L), new Tuple2<Integer, Long>(45, -5L)), + new Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer, Long>>(new Tuple2<String, Double>("world", 2.0), new Tuple2<Long, Long>(2L, 6L), new Tuple2<Integer, Long>(45, 5L)), + new Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer, Long>>(new Tuple2<String, Double>("world", 3.0), new Tuple2<Long, Long>(2L, 8L), new Tuple2<Integer, Long>(323, 2L)), + new Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer, Long>>(new Tuple2<String, Double>("world", 3.5), new Tuple2<Long, Long>(2L, 9L), new Tuple2<Integer, Long>(323, 5L)), + new Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer, Long>>(new Tuple2<String, Double>("world", 4325.12), new Tuple2<Long, Long>(2L, 123L), new Tuple2<Integer, Long>(555, 1L)) + + }; + + @SuppressWarnings("unchecked") + @Override + protected TupleComparator<Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer, Long>>> createComparator( + boolean ascending) { + return new TupleComparator<Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer, Long>>>( + new int[] { 0 }, + new TypeComparator[] { + new TupleComparator<Tuple2<String, Double>>( + new int[] { 0, 1 }, + new TypeComparator[] { + new StringComparator(ascending), + new DoubleComparator(ascending) }, + new TypeSerializer[] { + StringSerializer.INSTANCE, + DoubleSerializer.INSTANCE }) }, + new TypeSerializer[] { + new TupleSerializer<Tuple2<String, Double>>( + (Class<Tuple2<String, Double>>) (Class<?>) Tuple2.class, + new TypeSerializer[] { + StringSerializer.INSTANCE, + DoubleSerializer.INSTANCE }), + new TupleSerializer<Tuple2<Long, Long>>( + (Class<Tuple2<Long, Long>>) (Class<?>) Tuple2.class, + new TypeSerializer[] { + LongSerializer.INSTANCE, + LongSerializer.INSTANCE }), + new TupleSerializer<Tuple2<Integer, Long>>( + (Class<Tuple2<Integer, Long>>) (Class<?>) Tuple2.class, + new TypeSerializer[] { + IntSerializer.INSTANCE, + LongSerializer.INSTANCE }) }); + } + + @SuppressWarnings("unchecked") + @Override + protected TupleSerializer<Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer, Long>>> createSerializer() { + return new TupleSerializer<Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer, Long>>>( + (Class<Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer, Long>>>) (Class<?>) Tuple3.class, + new TypeSerializer[]{ + new TupleSerializer<Tuple2<String, Double>> ( + (Class<Tuple2<String, Double>>) (Class<?>) Tuple2.class, + new TypeSerializer[]{ + StringSerializer.INSTANCE, + DoubleSerializer.INSTANCE + }), + new TupleSerializer<Tuple2<Long, Long>> ( + (Class<Tuple2<Long, Long>>) (Class<?>) Tuple2.class, + new TypeSerializer[]{ + LongSerializer.INSTANCE, + LongSerializer.INSTANCE + }), + new TupleSerializer<Tuple2<Integer, Long>> ( + (Class<Tuple2<Integer, Long>>) (Class<?>) Tuple2.class, + new TypeSerializer[]{ + IntSerializer.INSTANCE, + LongSerializer.INSTANCE + }) + }); + } + + @Override + protected Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer, Long>>[] getSortedTestData() { + return this.dataISD; + } + + @Override + protected void deepEquals( + String message, + Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer, Long>> should, + Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer, Long>> is) { + + for (int x = 0; x < should.getArity(); x++) { + // Check whether field is of type Tuple2 because assertEquals must be called on the non Tuple2 fields. + if(should.getField(x) instanceof Tuple2) { + this.deepEquals(message, (Tuple2<?,?>) should.getField(x), (Tuple2<?,?>)is.getField(x)); + } + else { + assertEquals(message, should.getField(x), is.getField(x)); + } + }// For + } + + protected void deepEquals(String message, Tuple2<?,?> should, Tuple2<?,?> is) { + for (int x = 0; x < should.getArity(); x++) { + assertEquals(message, should.getField(x), is.getField(x)); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorTTT2Test.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorTTT2Test.java b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorTTT2Test.java new file mode 100644 index 0000000..4b07c61 --- /dev/null +++ b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorTTT2Test.java @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.java.typeutils.runtime; + +import static org.junit.Assert.assertEquals; + +import org.apache.flink.api.common.typeutils.TypeComparator; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.base.DoubleComparator; +import org.apache.flink.api.common.typeutils.base.DoubleSerializer; +import org.apache.flink.api.common.typeutils.base.IntComparator; +import org.apache.flink.api.common.typeutils.base.IntSerializer; +import org.apache.flink.api.common.typeutils.base.LongComparator; +import org.apache.flink.api.common.typeutils.base.LongSerializer; +import org.apache.flink.api.common.typeutils.base.StringComparator; +import org.apache.flink.api.common.typeutils.base.StringSerializer; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.api.java.typeutils.runtime.tuple.base.TupleComparatorTestBase; + +public class TupleComparatorTTT2Test extends TupleComparatorTestBase<Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer, Long>>> { + + @SuppressWarnings("unchecked") + Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer, Long>>[] dataISD = new Tuple3[]{ + new Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer, Long>>(new Tuple2<String, Double>("hello", 1.0), new Tuple2<Long, Long>(1L, 1L), new Tuple2<Integer, Long>(4, -10L)), + new Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer, Long>>(new Tuple2<String, Double>("hello", 2.0), new Tuple2<Long, Long>(1L, 2L), new Tuple2<Integer, Long>(4, -5L)), + new Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer, Long>>(new Tuple2<String, Double>("hello", 3.0), new Tuple2<Long, Long>(1L, 3L), new Tuple2<Integer, Long>(4, 0L)), + new Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer, Long>>(new Tuple2<String, Double>("hello", 3.5), new Tuple2<Long, Long>(1L, 4L), new Tuple2<Integer, Long>(4, 5L)), + new Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer, Long>>(new Tuple2<String, Double>("hello", 4325.12), new Tuple2<Long, Long>(1L, 5L), new Tuple2<Integer, Long>(4, 15L)), + new Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer, Long>>(new Tuple2<String, Double>("world", 1.0), new Tuple2<Long, Long>(2L, 4L), new Tuple2<Integer, Long>(45, -5L)), + new Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer, Long>>(new Tuple2<String, Double>("world", 2.0), new Tuple2<Long, Long>(2L, 6L), new Tuple2<Integer, Long>(45, 5L)), + new Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer, Long>>(new Tuple2<String, Double>("world", 3.0), new Tuple2<Long, Long>(2L, 8L), new Tuple2<Integer, Long>(323, 2L)), + new Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer, Long>>(new Tuple2<String, Double>("world", 3.5), new Tuple2<Long, Long>(2L, 9L), new Tuple2<Integer, Long>(323, 5L)), + new Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer, Long>>(new Tuple2<String, Double>("world", 4325.12), new Tuple2<Long, Long>(2L, 123L), new Tuple2<Integer, Long>(555, 1L)) + + }; + + @SuppressWarnings("unchecked") + @Override + protected TupleComparator<Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer, Long>>> createComparator( + boolean ascending) { + return new TupleComparator<Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer, Long>>>( + new int[] { 0, 2 }, + new TypeComparator[] { + new TupleComparator<Tuple2<String, Double>>( + new int[] { 0, 1 }, + new TypeComparator[] { + new StringComparator(ascending), + new DoubleComparator(ascending) }, + new TypeSerializer[] { + StringSerializer.INSTANCE, + DoubleSerializer.INSTANCE }), + new TupleComparator<Tuple2<Integer, Long>>( + new int[] { 0, 1 }, + new TypeComparator[] { + new IntComparator(ascending), + new LongComparator(ascending) }, + new TypeSerializer[] { + IntSerializer.INSTANCE, + LongSerializer.INSTANCE }) }, + new TypeSerializer[] { + new TupleSerializer<Tuple2<String, Double>>( + (Class<Tuple2<String, Double>>) (Class<?>) Tuple2.class, + new TypeSerializer[] { + StringSerializer.INSTANCE, + DoubleSerializer.INSTANCE }), + new TupleSerializer<Tuple2<Long, Long>>( + (Class<Tuple2<Long, Long>>) (Class<?>) Tuple2.class, + new TypeSerializer[] { + LongSerializer.INSTANCE, + LongSerializer.INSTANCE }), + new TupleSerializer<Tuple2<Integer, Long>>( + (Class<Tuple2<Integer, Long>>) (Class<?>) Tuple2.class, + new TypeSerializer[] { + IntSerializer.INSTANCE, + LongSerializer.INSTANCE }) }); + } + + @SuppressWarnings("unchecked") + @Override + protected TupleSerializer<Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer, Long>>> createSerializer() { + return new TupleSerializer<Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer, Long>>>( + (Class<Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer, Long>>>) (Class<?>) Tuple3.class, + new TypeSerializer[]{ + new TupleSerializer<Tuple2<String, Double>> ( + (Class<Tuple2<String, Double>>) (Class<?>) Tuple2.class, + new TypeSerializer[]{ + StringSerializer.INSTANCE, + DoubleSerializer.INSTANCE}), + new TupleSerializer<Tuple2<Long, Long>> ( + (Class<Tuple2<Long, Long>>) (Class<?>) Tuple2.class, + new TypeSerializer[]{ + LongSerializer.INSTANCE, + LongSerializer.INSTANCE}), + new TupleSerializer<Tuple2<Integer, Long>> ( + (Class<Tuple2<Integer, Long>>) (Class<?>) Tuple2.class, + new TypeSerializer[]{ + IntSerializer.INSTANCE, + LongSerializer.INSTANCE}) + }); + } + + @Override + protected Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer, Long>>[] getSortedTestData() { + return this.dataISD; + } + + @Override + protected void deepEquals( + String message, + Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer, Long>> should, + Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer, Long>> is) { + + for (int x = 0; x < should.getArity(); x++) { + // Check whether field is of type Tuple2 because assertEquals must be called on the non Tuple2 fields. + if(should.getField(x) instanceof Tuple2) { + this.deepEquals(message, (Tuple2<?,?>) should.getField(x), (Tuple2<?,?>)is.getField(x)); + } + else { + assertEquals(message, should.getField(x), is.getField(x)); + } + }// For + } + + protected void deepEquals(String message, Tuple2<?,?> should, Tuple2<?,?> is) { + for (int x = 0; x < should.getArity(); x++) { + assertEquals(message, should.getField(x), is.getField(x)); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorTTT3Test.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorTTT3Test.java b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorTTT3Test.java new file mode 100644 index 0000000..0dfc094 --- /dev/null +++ b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorTTT3Test.java @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.java.typeutils.runtime; + +import static org.junit.Assert.assertEquals; +import org.apache.flink.api.common.typeutils.TypeComparator; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.base.DoubleComparator; +import org.apache.flink.api.common.typeutils.base.DoubleSerializer; +import org.apache.flink.api.common.typeutils.base.IntComparator; +import org.apache.flink.api.common.typeutils.base.IntSerializer; +import org.apache.flink.api.common.typeutils.base.LongComparator; +import org.apache.flink.api.common.typeutils.base.LongSerializer; +import org.apache.flink.api.common.typeutils.base.StringComparator; +import org.apache.flink.api.common.typeutils.base.StringSerializer; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.api.java.typeutils.runtime.tuple.base.TupleComparatorTestBase; + +public class TupleComparatorTTT3Test extends TupleComparatorTestBase<Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer, Long>>>{ + @SuppressWarnings("unchecked") + Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer, Long>>[] dataISD = new Tuple3[]{ + new Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer, Long>>(new Tuple2<String, Double>("hello", 1.0), new Tuple2<Long, Long>(1L, 1L), new Tuple2<Integer, Long>(4, -10L)), + new Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer, Long>>(new Tuple2<String, Double>("hello", 2.0), new Tuple2<Long, Long>(1L, 2L), new Tuple2<Integer, Long>(4, -5L)), + new Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer, Long>>(new Tuple2<String, Double>("hello", 3.0), new Tuple2<Long, Long>(1L, 3L), new Tuple2<Integer, Long>(4, 0L)), + new Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer, Long>>(new Tuple2<String, Double>("hello", 3.5), new Tuple2<Long, Long>(1L, 4L), new Tuple2<Integer, Long>(4, 5L)), + new Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer, Long>>(new Tuple2<String, Double>("hello", 4325.12), new Tuple2<Long, Long>(1L, 5L), new Tuple2<Integer, Long>(4, 15L)), + new Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer, Long>>(new Tuple2<String, Double>("world", 1.0), new Tuple2<Long, Long>(2L, 4L), new Tuple2<Integer, Long>(45, -5L)), + new Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer, Long>>(new Tuple2<String, Double>("world", 2.0), new Tuple2<Long, Long>(2L, 6L), new Tuple2<Integer, Long>(45, 5L)), + new Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer, Long>>(new Tuple2<String, Double>("world", 3.0), new Tuple2<Long, Long>(2L, 8L), new Tuple2<Integer, Long>(323, 2L)), + new Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer, Long>>(new Tuple2<String, Double>("world", 3.5), new Tuple2<Long, Long>(2L, 9L), new Tuple2<Integer, Long>(323, 5L)), + new Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer, Long>>(new Tuple2<String, Double>("world", 4325.12), new Tuple2<Long, Long>(2L, 123L), new Tuple2<Integer, Long>(555, 1L)) + + }; + + @SuppressWarnings("unchecked") + @Override + protected TupleComparator<Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer, Long>>> createComparator( + boolean ascending) { + return new TupleComparator<Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer, Long>>>( + new int[] { 0, 1, 2 }, + new TypeComparator[] { + new TupleComparator<Tuple2<String, Double>>( + new int[] { 0, 1 }, + new TypeComparator[] { + new StringComparator(ascending), + new DoubleComparator(ascending) }, + new TypeSerializer[] { + StringSerializer.INSTANCE, + DoubleSerializer.INSTANCE }), + new TupleComparator<Tuple2<Long, Long>>( + new int[] { 0, 1 }, + new TypeComparator[] { + new LongComparator(ascending), + new LongComparator(ascending) }, + new TypeSerializer[] { + LongSerializer.INSTANCE, + LongSerializer.INSTANCE }), + new TupleComparator<Tuple2<Integer, Long>>( + new int[] { 0, 1 }, + new TypeComparator[] { + new IntComparator(ascending), + new LongComparator(ascending) }, + new TypeSerializer[] { + IntSerializer.INSTANCE, + LongSerializer.INSTANCE }) }, + new TypeSerializer[] { + new TupleSerializer<Tuple2<String, Double>>( + (Class<Tuple2<String, Double>>) (Class<?>) Tuple2.class, + new TypeSerializer[] { + StringSerializer.INSTANCE, + DoubleSerializer.INSTANCE }), + new TupleSerializer<Tuple2<Long, Long>>( + (Class<Tuple2<Long, Long>>) (Class<?>) Tuple2.class, + new TypeSerializer[] { + LongSerializer.INSTANCE, + LongSerializer.INSTANCE }), + new TupleSerializer<Tuple2<Integer, Long>>( + (Class<Tuple2<Integer, Long>>) (Class<?>) Tuple2.class, + new TypeSerializer[] { + IntSerializer.INSTANCE, + LongSerializer.INSTANCE }) }); + } + + @SuppressWarnings("unchecked") + @Override + protected TupleSerializer<Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer, Long>>> createSerializer() { + return new TupleSerializer<Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer, Long>>>( + (Class<Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer, Long>>>) (Class<?>) Tuple3.class, + new TypeSerializer[]{ + new TupleSerializer<Tuple2<String, Double>> ( + (Class<Tuple2<String, Double>>) (Class<?>) Tuple2.class, + new TypeSerializer[]{ + StringSerializer.INSTANCE, + DoubleSerializer.INSTANCE + }), + new TupleSerializer<Tuple2<Long, Long>> ( + (Class<Tuple2<Long, Long>>) (Class<?>) Tuple2.class, + new TypeSerializer[]{ + LongSerializer.INSTANCE, + LongSerializer.INSTANCE + }), + new TupleSerializer<Tuple2<Integer, Long>> ( + (Class<Tuple2<Integer, Long>>) (Class<?>) Tuple2.class, + new TypeSerializer[]{ + IntSerializer.INSTANCE, + LongSerializer.INSTANCE + }) + }); + } + + @Override + protected Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer, Long>>[] getSortedTestData() { + return this.dataISD; + } + + @Override + protected void deepEquals( + String message, + Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer, Long>> should, + Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer, Long>> is) { + + for (int x = 0; x < should.getArity(); x++) { + // Check whether field is of type Tuple2 because assertEquals must be called on the non Tuple2 fields. + if(should.getField(x) instanceof Tuple2) { + this.deepEquals(message, (Tuple2<?,?>) should.getField(x), (Tuple2<?,?>)is.getField(x)); + } + else { + assertEquals(message, should.getField(x), is.getField(x)); + } + }// For + } + + protected void deepEquals(String message, Tuple2<?,?> should, Tuple2<?,?> is) { + for (int x = 0; x < should.getArity(); x++) { + assertEquals(message, should.getField(x), is.getField(x)); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerTest.java b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerTest.java new file mode 100644 index 0000000..017eb44 --- /dev/null +++ b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerTest.java @@ -0,0 +1,238 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.flink.api.java.typeutils.runtime; + +import java.util.ArrayList; +import java.util.Random; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.tuple.Tuple; +import org.apache.flink.api.java.tuple.Tuple0; +import org.apache.flink.api.java.tuple.Tuple1; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple5; +import org.apache.flink.api.java.typeutils.runtime.AbstractGenericTypeSerializerTest.Book; +import org.apache.flink.api.java.typeutils.runtime.AbstractGenericTypeSerializerTest.BookAuthor; +import org.apache.flink.api.java.typeutils.runtime.AbstractGenericTypeSerializerTest.ComplexNestedObject1; +import org.apache.flink.api.java.typeutils.runtime.AbstractGenericTypeSerializerTest.ComplexNestedObject2; +import org.apache.flink.api.java.typeutils.runtime.AbstractGenericTypeSerializerTest.SimpleTypes; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.util.StringUtils; +import org.junit.Assert; +import org.junit.Test; + +public class TupleSerializerTest { + + @Test + public void testTuple0() { + Tuple0[] testTuples = new Tuple0[] { Tuple0.INSTANCE, Tuple0.INSTANCE, Tuple0.INSTANCE }; + + runTests(testTuples); + } + + @Test + public void testTuple1Int() { + @SuppressWarnings({"unchecked", "rawtypes"}) + Tuple1<Integer>[] testTuples = new Tuple1[] { + new Tuple1<Integer>(42), new Tuple1<Integer>(1), new Tuple1<Integer>(0), new Tuple1<Integer>(-1), + new Tuple1<Integer>(Integer.MAX_VALUE), new Tuple1<Integer>(Integer.MIN_VALUE) + }; + + runTests(testTuples); + } + + @Test + public void testTuple1String() { + Random rnd = new Random(68761564135413L); + + @SuppressWarnings({"unchecked", "rawtypes"}) + Tuple1<String>[] testTuples = new Tuple1[] { + new Tuple1<String>(StringUtils.getRandomString(rnd, 10, 100)), + new Tuple1<String>("abc"), + new Tuple1<String>(""), + new Tuple1<String>(StringUtils.getRandomString(rnd, 30, 170)), + new Tuple1<String>(StringUtils.getRandomString(rnd, 15, 50)), + new Tuple1<String>("") + }; + + runTests(testTuples); + } + + @Test + public void testTuple1StringArray() { + Random rnd = new Random(289347567856686223L); + + String[] arr1 = new String[] {"abc", "", + StringUtils.getRandomString(rnd, 10, 100), + StringUtils.getRandomString(rnd, 15, 50), + StringUtils.getRandomString(rnd, 30, 170), + StringUtils.getRandomString(rnd, 14, 15), + ""}; + + String[] arr2 = new String[] {"foo", "", + StringUtils.getRandomString(rnd, 10, 100), + StringUtils.getRandomString(rnd, 1000, 5000), + StringUtils.getRandomString(rnd, 30000, 35000), + StringUtils.getRandomString(rnd, 100*1024, 105*1024), + "bar"}; + + @SuppressWarnings("unchecked") + Tuple1<String[]>[] testTuples = new Tuple1[] { + new Tuple1<String[]>(arr1), + new Tuple1<String[]>(arr2) + }; + + runTests(testTuples); + } + + @Test + public void testTuple2StringDouble() { + Random rnd = new Random(807346528946L); + + @SuppressWarnings("unchecked") + Tuple2<String, Double>[] testTuples = new Tuple2[] { + new Tuple2<String, Double>(StringUtils.getRandomString(rnd, 10, 100), rnd.nextDouble()), + new Tuple2<String, Double>(StringUtils.getRandomString(rnd, 10, 100), rnd.nextDouble()), + new Tuple2<String, Double>(StringUtils.getRandomString(rnd, 10, 100), rnd.nextDouble()), + new Tuple2<String, Double>("", rnd.nextDouble()), + new Tuple2<String, Double>(StringUtils.getRandomString(rnd, 10, 100), rnd.nextDouble()), + new Tuple2<String, Double>(StringUtils.getRandomString(rnd, 10, 100), rnd.nextDouble()) + }; + + runTests(testTuples); + } + + @Test + public void testTuple2StringStringArray() { + Random rnd = new Random(289347567856686223L); + + String[] arr1 = new String[] {"abc", "", + StringUtils.getRandomString(rnd, 10, 100), + StringUtils.getRandomString(rnd, 15, 50), + StringUtils.getRandomString(rnd, 30, 170), + StringUtils.getRandomString(rnd, 14, 15), + ""}; + + String[] arr2 = new String[] {"foo", "", + StringUtils.getRandomString(rnd, 10, 100), + StringUtils.getRandomString(rnd, 1000, 5000), + StringUtils.getRandomString(rnd, 30000, 35000), + StringUtils.getRandomString(rnd, 100*1024, 105*1024), + "bar"}; + + @SuppressWarnings("unchecked") + Tuple2<String, String[]>[] testTuples = new Tuple2[] { + new Tuple2<String, String[]>(StringUtils.getRandomString(rnd, 30, 170), arr1), + new Tuple2<String, String[]>(StringUtils.getRandomString(rnd, 30, 170), arr2), + new Tuple2<String, String[]>(StringUtils.getRandomString(rnd, 30, 170), arr1), + new Tuple2<String, String[]>(StringUtils.getRandomString(rnd, 30, 170), arr2), + new Tuple2<String, String[]>(StringUtils.getRandomString(rnd, 30, 170), arr2) + }; + + runTests(testTuples); + } + + + @Test + public void testTuple5CustomObjects() { + Random rnd = new Random(807346528946L); + + SimpleTypes a = new SimpleTypes(); + SimpleTypes b = new SimpleTypes(rnd.nextInt(), rnd.nextLong(), (byte) rnd.nextInt(), + StringUtils.getRandomString(rnd, 10, 100), (short) rnd.nextInt(), rnd.nextDouble()); + SimpleTypes c = new SimpleTypes(rnd.nextInt(), rnd.nextLong(), (byte) rnd.nextInt(), + StringUtils.getRandomString(rnd, 10, 100), (short) rnd.nextInt(), rnd.nextDouble()); + SimpleTypes d = new SimpleTypes(rnd.nextInt(), rnd.nextLong(), (byte) rnd.nextInt(), + StringUtils.getRandomString(rnd, 10, 100), (short) rnd.nextInt(), rnd.nextDouble()); + SimpleTypes e = new SimpleTypes(rnd.nextInt(), rnd.nextLong(), (byte) rnd.nextInt(), + StringUtils.getRandomString(rnd, 10, 100), (short) rnd.nextInt(), rnd.nextDouble()); + SimpleTypes f = new SimpleTypes(rnd.nextInt(), rnd.nextLong(), (byte) rnd.nextInt(), + StringUtils.getRandomString(rnd, 10, 100), (short) rnd.nextInt(), rnd.nextDouble()); + SimpleTypes g = new SimpleTypes(rnd.nextInt(), rnd.nextLong(), (byte) rnd.nextInt(), + StringUtils.getRandomString(rnd, 10, 100), (short) rnd.nextInt(), rnd.nextDouble()); + + ComplexNestedObject1 o1 = new ComplexNestedObject1(5626435); + ComplexNestedObject1 o2 = new ComplexNestedObject1(76923); + ComplexNestedObject1 o3 = new ComplexNestedObject1(-1100); + ComplexNestedObject1 o4 = new ComplexNestedObject1(0); + ComplexNestedObject1 o5 = new ComplexNestedObject1(44); + + ComplexNestedObject2 co1 = new ComplexNestedObject2(rnd); + ComplexNestedObject2 co2 = new ComplexNestedObject2(); + ComplexNestedObject2 co3 = new ComplexNestedObject2(rnd); + ComplexNestedObject2 co4 = new ComplexNestedObject2(rnd); + + Book b1 = new Book(976243875L, "The Serialization Odysse", 42); + Book b2 = new Book(0L, "Debugging byte streams", 1337); + Book b3 = new Book(-1L, "Low level interfaces", 0xC0FFEE); + Book b4 = new Book(Long.MAX_VALUE, "The joy of bits and bytes", 0xDEADBEEF); + Book b5 = new Book(Long.MIN_VALUE, "Winnign a prize for creative test strings", 0xBADF00); + Book b6 = new Book(-2L, "Distributed Systems", 0xABCDEF0123456789L); + + ArrayList<String> list = new ArrayList<String>(); + list.add("A"); + list.add("B"); + list.add("C"); + list.add("D"); + list.add("E"); + + BookAuthor ba1 = new BookAuthor(976243875L, list, "Arno Nym"); + + ArrayList<String> list2 = new ArrayList<String>(); + BookAuthor ba2 = new BookAuthor(987654321L, list2, "The Saurus"); + + + @SuppressWarnings("unchecked") + Tuple5<SimpleTypes, Book, ComplexNestedObject1, BookAuthor, ComplexNestedObject2>[] testTuples = new Tuple5[] { + new Tuple5<SimpleTypes, Book, ComplexNestedObject1, BookAuthor, ComplexNestedObject2>(a, b1, o1, ba1, co1), + new Tuple5<SimpleTypes, Book, ComplexNestedObject1, BookAuthor, ComplexNestedObject2>(b, b2, o2, ba2, co2), + new Tuple5<SimpleTypes, Book, ComplexNestedObject1, BookAuthor, ComplexNestedObject2>(c, b3, o3, ba1, co3), + new Tuple5<SimpleTypes, Book, ComplexNestedObject1, BookAuthor, ComplexNestedObject2>(d, b2, o4, ba1, co4), + new Tuple5<SimpleTypes, Book, ComplexNestedObject1, BookAuthor, ComplexNestedObject2>(e, b4, o5, ba2, co4), + new Tuple5<SimpleTypes, Book, ComplexNestedObject1, BookAuthor, ComplexNestedObject2>(f, b5, o1, ba2, co4), + new Tuple5<SimpleTypes, Book, ComplexNestedObject1, BookAuthor, ComplexNestedObject2>(g, b6, o4, ba1, co2) + }; + + runTests(testTuples); + } + + private <T extends Tuple> void runTests(T... instances) { + try { + TupleTypeInfo<T> tupleTypeInfo = (TupleTypeInfo<T>) TypeExtractor.getForObject(instances[0]); + TypeSerializer<T> serializer = tupleTypeInfo.createSerializer(new ExecutionConfig()); + + Class<T> tupleClass = tupleTypeInfo.getTypeClass(); + + int length = -1; + if(tupleClass == Tuple0.class) { + length = 1; + } + TupleSerializerTestInstance<T> test = new TupleSerializerTestInstance<T>(serializer, tupleClass, length, instances); + test.testAll(); + } + catch (Exception e) { + System.err.println(e.getMessage()); + e.printStackTrace(); + Assert.fail(e.getMessage()); + } + } +}