http://git-wip-us.apache.org/repos/asf/flink/blob/fbc18b96/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/TestData.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/TestData.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/TestData.java
index 2427edd..d12307a 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/TestData.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/TestData.java
@@ -18,13 +18,24 @@
 
 package org.apache.flink.runtime.operators.testutils;
 
-import java.util.Comparator;
+import java.io.IOException;
 import java.util.Random;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
+import org.apache.flink.api.common.typeutils.base.IntComparator;
+import org.apache.flink.api.java.tuple.Tuple;
 
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.operators.testutils.types.IntPair;
 import org.apache.flink.types.IntValue;
-import org.apache.flink.types.Record;
-import org.apache.flink.types.StringValue;
 import org.apache.flink.util.MutableObjectIterator;
 
 /**
@@ -38,250 +49,6 @@ public final class TestData {
        private TestData() {}
 
        /**
-        * Key comparator.
-        */
-       public static class KeyComparator implements Comparator<Key> {
-               @Override
-               public int compare(Key k1, Key k2) {
-                       return k1.compareTo(k2);
-               }
-       };
-
-       /**
-        * Key implementation.
-        */
-       public static class Key extends IntValue {
-               private static final long serialVersionUID = 1L;
-               
-               public Key() {
-                       super();
-               }
-
-               public Key(int k) {
-                       super(k);
-               }
-
-               public int getKey() {
-                       return getValue();
-               }
-               
-               public void setKey(int key) {
-                       setValue(key);
-               }
-       }
-
-       /**
-        * Value implementation.
-        */
-       public static class Value extends StringValue {
-               
-               private static final long serialVersionUID = 1L;
-
-               public Value() {
-                       super();
-               }
-
-               public Value(String v) {
-                       super(v);
-               }
-               
-               @Override
-               public boolean equals(final Object obj) {
-                       if (this == obj) {
-                               return true;
-                       }
-                       
-                       if (obj.getClass() == TestData.Value.class) {
-                               final StringValue other = (StringValue) obj;
-                               int len = this.length();
-                               
-                               if (len == other.length()) {
-                                       final char[] tc = this.getCharArray();
-                                       final char[] oc = other.getCharArray();
-                                       int i = 0, j = 0;
-                                       
-                                       while (len-- != 0) {
-                                               if (tc[i++] != oc[j++]) {
-                                                       return false;
-                                               }
-                                       }
-                                       return true;
-                               }
-                       }
-                       return false;
-               }
-       }
-
-       /**
-        * Pair generator.
-        */
-       public static class Generator implements MutableObjectIterator<Record> {
-               
-               public enum KeyMode {
-                       SORTED, RANDOM
-               };
-
-               public enum ValueMode {
-                       FIX_LENGTH, RANDOM_LENGTH, CONSTANT
-               };
-
-               private static char[] alpha = { 'a', 'b', 'c', 'd', 'e', 'f', 
'g', 'h', 'i', 'j', 'k', 'l', 'm', 'a', 'b', 'c',
-                       'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm' };
-
-               private final long seed;
-
-               private final int keyMax;
-
-               private final int valueLength;
-
-               private final KeyMode keyMode;
-
-               private final ValueMode valueMode;
-
-               private Random random;
-
-               private int counter;
-
-               private Key key;
-               private Value value;
-
-               public Generator(long seed, int keyMax, int valueLength) {
-                       this(seed, keyMax, valueLength, KeyMode.RANDOM, 
ValueMode.FIX_LENGTH);
-               }
-
-               public Generator(long seed, int keyMax, int valueLength, 
KeyMode keyMode, ValueMode valueMode) {
-                       this(seed, keyMax, valueLength, keyMode, valueMode, 
null);
-               }
-               
-               public Generator(long seed, int keyMax, int valueLength, 
KeyMode keyMode, ValueMode valueMode, Value constant) {
-                       this.seed = seed;
-                       this.keyMax = keyMax;
-                       this.valueLength = valueLength;
-                       this.keyMode = keyMode;
-                       this.valueMode = valueMode;
-
-                       this.random = new Random(seed);
-                       this.counter = 0;
-                       
-                       this.key = new Key();
-                       this.value = constant == null ? new Value() : constant;
-               }
-
-               public Record next(Record reuse) {
-                       this.key.setKey(keyMode == KeyMode.SORTED ? ++counter : 
Math.abs(random.nextInt() % keyMax) + 1);
-                       if (this.valueMode != ValueMode.CONSTANT) {
-                               this.value.setValue(randomString());
-                       }
-                       reuse.setField(0, this.key);
-                       reuse.setField(1, this.value);
-                       return reuse;
-               }
-
-               public Record next() {
-                       return next(new Record(2));
-               }
-
-               public boolean next(org.apache.flink.types.Value[] target) {
-                       this.key.setKey(keyMode == KeyMode.SORTED ? ++counter : 
Math.abs(random.nextInt() % keyMax) + 1);
-                       // TODO change this to something proper
-                       ((IntValue)target[0]).setValue(this.key.getValue());
-                       ((IntValue)target[1]).setValue(random.nextInt());
-                       return true;
-               }
-
-               public int sizeOf(Record rec) {
-                       // key
-                       int valueLength = Integer.SIZE / 8;
-
-                       // value
-                       String text = rec.getField(1, Value.class).getValue();
-                       int strlen = text.length();
-                       int utflen = 0;
-                       int c;
-                       for (int i = 0; i < strlen; i++) {
-                               c = text.charAt(i);
-                               if ((c >= 0x0001) && (c <= 0x007F)) {
-                                       utflen++;
-                               } else if (c > 0x07FF) {
-                                       utflen += 3;
-                               } else {
-                                       utflen += 2;
-                               }
-                       }
-                       valueLength += 2 + utflen;
-
-                       return valueLength;
-               }
-
-               public void reset() {
-                       this.random = new Random(seed);
-                       this.counter = 0;
-               }
-
-               private String randomString() {
-                       int length;
-
-                       if (valueMode == ValueMode.FIX_LENGTH) {
-                               length = valueLength;
-                       } else {
-                               length = valueLength - 
random.nextInt(valueLength / 3);
-                       }
-
-                       StringBuilder sb = new StringBuilder();
-                       for (int i = 0; i < length; i++) {
-                               sb.append(alpha[random.nextInt(alpha.length)]);
-                       }
-                       return sb.toString();
-               }
-
-       }
-       
-       /**
-        * Record reader mock.
-        */
-       public static class GeneratorIterator implements 
MutableObjectIterator<Record> {
-               
-               private final Generator generator;
-
-               private final int numberOfRecords;
-
-               private int counter;
-
-               public GeneratorIterator(Generator generator, int 
numberOfRecords) {
-                       this.generator = generator;
-                       this.generator.reset();
-                       this.numberOfRecords = numberOfRecords;
-                       this.counter = 0;
-               }
-
-               @Override
-               public Record next(Record target) {
-                       if (counter < numberOfRecords) {
-                               counter++;
-                               return generator.next(target);
-                       }
-                       else {
-                               return null;
-                       }
-               }
-
-               @Override
-               public Record next() {
-                       if (counter < numberOfRecords) {
-                               counter++;
-                               return generator.next();
-                       }
-                       else {
-                               return null;
-                       }
-               }
-               
-               public void reset() {
-                       this.counter = 0;
-               }
-       }
-
-       /**
         * Tuple2<Integer, String> generator.
         */
        public static class TupleGenerator implements 
MutableObjectIterator<Tuple2<Integer, String>> {
@@ -398,9 +165,8 @@ public final class TestData {
 
        }
 
-
        /**
-        * Record reader mock.
+        * Tuple reader mock.
         */
        public static class TupleGeneratorIterator implements 
MutableObjectIterator<Tuple2<Integer, String>> {
 
@@ -443,35 +209,31 @@ public final class TestData {
                        this.counter = 0;
                }
        }
-       
-       // 
--------------------------------------------------------------------------------------------
-       
-       public static class ConstantValueIterator implements 
MutableObjectIterator<Record> {
-               
-               private final Key key;
-               private final Value value;
-               
+
+       public static class TupleConstantValueIterator implements 
MutableObjectIterator<Tuple2<Integer, String>> {
+
+               private int key;
+               private String value;
+
                private final String valueValue;
-               
-               
+
+
                private final int numPairs;
-               
+
                private int pos;
-               
-               
-               public ConstantValueIterator(int keyValue, String valueValue, 
int numPairs) {
-                       this.key = new Key(keyValue);
-                       this.value = new Value();
+
+
+               public TupleConstantValueIterator(int keyValue, String 
valueValue, int numPairs) {
+                       this.key = keyValue;
                        this.valueValue = valueValue;
                        this.numPairs = numPairs;
                }
-               
+
                @Override
-               public Record next(Record reuse) {
+               public Tuple2<Integer, String> next(Tuple2<Integer, String> 
reuse) {
                        if (pos < this.numPairs) {
-                               this.value.setValue(this.valueValue + ' ' + 
pos);
-                               reuse.setField(0, this.key);
-                               reuse.setField(1, this.value);
+                               this.value = this.valueValue + ' ' + pos;
+                               reuse.setFields(this.key, this.value);
                                pos++;
                                return reuse;
                        }
@@ -481,8 +243,8 @@ public final class TestData {
                }
 
                @Override
-               public Record next() {
-                       return next(new Record(2));
+               public Tuple2<Integer, String> next() {
+                       return next(new Tuple2<Integer, String>());
                }
 
                public void reset() {
@@ -490,45 +252,307 @@ public final class TestData {
                }
        }
 
-       public static class TupleConstantValueIterator implements 
MutableObjectIterator<Tuple2<Integer, String>> {
+       /**
+        * An iterator that returns the Key/Value pairs with identical value a 
given number of times.
+        */
+       public static final class ConstantIntIntTuplesIterator implements 
MutableObjectIterator<Tuple2<Integer, Integer>> {
 
-               private int key;
-               private String value;
+               private final int key;
+               private final int value;
 
-               private final String valueValue;
+               private int numLeft;
 
+               public ConstantIntIntTuplesIterator(int key, int value, int 
count) {
+                       this.key = key;
+                       this.value = value;
+                       this.numLeft = count;
+               }
 
-               private final int numPairs;
+               @Override
+               public Tuple2<Integer, Integer> next(Tuple2<Integer, Integer> 
reuse) {
+                       if (this.numLeft > 0) {
+                               this.numLeft--;
+                               reuse.setField(this.key, 0);
+                               reuse.setField(this.value, 1);
+                               return reuse;
+                       } else {
+                               return null;
+                       }
+               }
 
-               private int pos;
+               @Override
+               public Tuple2<Integer, Integer> next() {
+                       return next(new Tuple2<>(0, 0));
+               }
+       }
 
+       //----Tuple2<Integer, String>
+       private static final TupleTypeInfo<Tuple2<Integer, String>> 
typeInfoIntString = TupleTypeInfo.getBasicTupleTypeInfo(Integer.class, 
String.class);
 
-               public TupleConstantValueIterator(int keyValue, String 
valueValue, int numPairs) {
-                       this.key = keyValue;
-                       this.valueValue = valueValue;
-                       this.numPairs = numPairs;
+       private static final TypeSerializerFactory<Tuple2<Integer, String>> 
serializerFactoryIntString = new MockTupleSerializerFactory(typeInfoIntString);
+
+       public static TupleTypeInfo<Tuple2<Integer, String>> 
getIntStringTupleTypeInfo() {
+               return typeInfoIntString;
+       }
+
+       public static TypeSerializerFactory<Tuple2<Integer, String>> 
getIntStringTupleSerializerFactory() {
+               return serializerFactoryIntString;
+       }
+
+       public static TypeSerializer<Tuple2<Integer, String>> 
getIntStringTupleSerializer() {
+               return serializerFactoryIntString.getSerializer();
+       }
+
+       public static TypeComparator<Tuple2<Integer, String>> 
getIntStringTupleComparator() {
+               return getIntStringTupleTypeInfo().createComparator(new 
int[]{0}, new boolean[]{true}, 0, null);
+       }
+
+       public static MockTuple2Reader<Tuple2<Integer, String>> 
getIntStringTupleReader() {
+               return new MockTuple2Reader<Tuple2<Integer, String>>();
+       }
+
+       //----Tuple2<Integer, Integer>
+       private static final TupleTypeInfo<Tuple2<Integer, Integer>> 
typeInfoIntInt = TupleTypeInfo.getBasicTupleTypeInfo(Integer.class, 
Integer.class);
+
+       private static final TypeSerializerFactory<Tuple2<Integer, Integer>> 
serializerFactoryIntInt = new MockTupleSerializerFactory(typeInfoIntInt);
+
+       public static TupleTypeInfo<Tuple2<Integer, Integer>> 
getIntIntTupleTypeInfo() {
+               return typeInfoIntInt;
+       }
+
+       public static TypeSerializerFactory<Tuple2<Integer, Integer>> 
getIntIntTupleSerializerFactory() {
+               return serializerFactoryIntInt;
+       }
+
+       public static TypeSerializer<Tuple2<Integer, Integer>> 
getIntIntTupleSerializer() {
+               return getIntIntTupleSerializerFactory().getSerializer();
+       }
+
+       public static TypeComparator<Tuple2<Integer, Integer>> 
getIntIntTupleComparator() {
+               return getIntIntTupleTypeInfo().createComparator(new int[]{0}, 
new boolean[]{true}, 0, null);
+       }
+
+       public static MockTuple2Reader<Tuple2<Integer, Integer>> 
getIntIntTupleReader() {
+               return new MockTuple2Reader<>();
+       }
+
+       //----Tuple2<?, ?>
+       private static class MockTupleSerializerFactory<T extends Tuple> 
implements TypeSerializerFactory<T> {
+               private final TupleTypeInfo<T> info;
+
+               public MockTupleSerializerFactory(TupleTypeInfo<T> info) {
+                       this.info = info;
                }
 
                @Override
-               public Tuple2<Integer, String> next(Tuple2<Integer, String> 
reuse) {
-                       if (pos < this.numPairs) {
-                               this.value = this.valueValue + ' ' + pos;
-                               reuse.setFields(this.key, this.value);
-                               pos++;
+               public void writeParametersToConfig(Configuration config) {
+                       throw new UnsupportedOperationException("Not supported 
yet.");
+               }
+
+               @Override
+               public void readParametersFromConfig(Configuration config, 
ClassLoader cl) throws ClassNotFoundException {
+                       throw new UnsupportedOperationException("Not supported 
yet.");
+               }
+
+               @Override
+               public TypeSerializer<T> getSerializer() {
+                       return info.createSerializer(null);
+               }
+
+               @Override
+               public Class<T> getDataType() {
+                       return info.getTypeClass();
+               }
+       }
+
+       public static class MockTuple2Reader<T extends Tuple2> implements 
MutableObjectIterator<T> {
+               private final Tuple2 SENTINEL = new Tuple2();
+
+               private final BlockingQueue<Tuple2> queue;
+
+               public MockTuple2Reader() {
+                       this.queue = new ArrayBlockingQueue<Tuple2>(32, false);
+               }
+
+               public MockTuple2Reader(int size) {
+                       this.queue = new ArrayBlockingQueue<Tuple2>(size, 
false);
+               }
+
+               @Override
+               public T next(T reuse) {
+                       Tuple2 r = null;
+                       while (r == null) {
+                               try {
+                                       r = queue.take();
+                               } catch (InterruptedException iex) {
+                                       throw new RuntimeException("Reader was 
interrupted.");
+                               }
+                       }
+
+                       if (r.equals(SENTINEL)) {
+                               // put the sentinel back, to ensure that 
repeated calls do not block
+                               try {
+                                       queue.put(r);
+                               } catch (InterruptedException e) {
+                                       throw new RuntimeException("Reader was 
interrupted.");
+                               }
+                               return null;
+                       } else {
+                               reuse.setField(r.getField(0), 0);
+                               reuse.setField(r.getField(1), 1);
                                return reuse;
                        }
-                       else {
+               }
+
+               @Override
+               public T next() {
+                       Tuple2 r = null;
+                       while (r == null) {
+                               try {
+                                       r = queue.take();
+                               } catch (InterruptedException iex) {
+                                       throw new RuntimeException("Reader was 
interrupted.");
+                               }
+                       }
+
+                       if (r.equals(SENTINEL)) {
+                               // put the sentinel back, to ensure that 
repeated calls do not block
+                               try {
+                                       queue.put(r);
+                               } catch (InterruptedException e) {
+                                       throw new RuntimeException("Reader was 
interrupted.");
+                               }
                                return null;
+                       } else {
+                               Tuple2 result = new Tuple2(r.f0, r.f1);
+                               return (T) result;
+                       }
+               }
+
+               public void emit(Tuple2 element) throws InterruptedException {
+                       queue.put(new Tuple2(element.f0, element.f1));
+               }
+
+               public void close() {
+                       try {
+                               queue.put(SENTINEL);
+                       } catch (InterruptedException e) {
+                               throw new RuntimeException(e);
                        }
                }
+       }
+       
+       public static class IntPairComparator extends TypeComparator<IntPair> {
+
+               private static final long serialVersionUID = 1L;
+
+               private int reference;
+
+               private final TypeComparator[] comparators = new 
TypeComparator[]{new IntComparator(true)};
 
                @Override
-               public Tuple2<Integer, String> next() {
-                       return next(new Tuple2<Integer, String>());
+               public int hash(IntPair object) {
+                       return comparators[0].hash(object.getKey());
                }
 
-               public void reset() {
-                       this.pos = 0;
+               @Override
+               public void setReference(IntPair toCompare) {
+                       this.reference = toCompare.getKey();
+               }
+
+               @Override
+               public boolean equalToReference(IntPair candidate) {
+                       return candidate.getKey() == this.reference;
+               }
+
+               @Override
+               public int compareToReference(TypeComparator<IntPair> 
referencedAccessors) {
+                       final IntPairComparator comp = (IntPairComparator) 
referencedAccessors;
+                       return comp.reference - this.reference;
+               }
+
+               @Override
+               public int compare(IntPair first, IntPair second) {
+                       return first.getKey() - second.getKey();
+               }
+
+               @Override
+               public int compareSerialized(DataInputView source1, 
DataInputView source2) throws IOException {
+                       return source1.readInt() - source2.readInt();
+               }
+
+               @Override
+               public boolean supportsNormalizedKey() {
+                       return true;
+               }
+
+               @Override
+               public int getNormalizeKeyLen() {
+                       return 4;
+               }
+
+               @Override
+               public boolean isNormalizedKeyPrefixOnly(int keyBytes) {
+                       return keyBytes < 4;
+               }
+
+               @Override
+               public void putNormalizedKey(IntPair record, MemorySegment 
target, int offset, int len) {
+                       // see IntValue for a documentation of the logic
+                       final int value = record.getKey() - Integer.MIN_VALUE;
+
+                       if (len == 4) {
+                               target.putIntBigEndian(offset, value);
+                       } else if (len <= 0) {
+                       } else if (len < 4) {
+                               for (int i = 0; len > 0; len--, i++) {
+                                       target.put(offset + i, (byte) ((value 
>>> ((3 - i) << 3)) & 0xff));
+                               }
+                       } else {
+                               target.putIntBigEndian(offset, value);
+                               for (int i = 4; i < len; i++) {
+                                       target.put(offset + i, (byte) 0);
+                               }
+                       }
+               }
+
+               @Override
+               public boolean invertNormalizedKey() {
+                       return false;
+               }
+
+               @Override
+               public IntPairComparator duplicate() {
+                       return new IntPairComparator();
+               }
+
+               @Override
+               public int extractKeys(Object record, Object[] target, int 
index) {
+                       target[index] = ((IntPair) record).getKey();
+                       return 1;
+               }
+
+               @Override
+               public TypeComparator[] getFlatComparators() {
+                       return comparators;
+               }
+
+               @Override
+               public boolean supportsSerializationWithKeyNormalization() {
+                       return true;
+               }
+
+               @Override
+               public void writeWithKeyNormalization(IntPair record, 
DataOutputView target) throws IOException {
+                       target.writeInt(record.getKey() - Integer.MIN_VALUE);
+                       target.writeInt(record.getValue());
+               }
+
+               @Override
+               public IntPair readWithKeyDenormalization(IntPair reuse, 
DataInputView source) throws IOException {
+                       reuse.setKey(source.readInt() + Integer.MIN_VALUE);
+                       reuse.setValue(source.readInt());
+                       return reuse;
                }
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/fbc18b96/flink-runtime/src/test/java/org/apache/flink/runtime/operators/util/HashVsSortMiniBenchmark.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/util/HashVsSortMiniBenchmark.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/util/HashVsSortMiniBenchmark.java
index f112ff8..4c5a07e 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/util/HashVsSortMiniBenchmark.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/util/HashVsSortMiniBenchmark.java
@@ -18,13 +18,12 @@
 
 package org.apache.flink.runtime.operators.util;
 
+import org.apache.flink.api.common.functions.FlatJoinFunction;
+import org.apache.flink.api.common.typeutils.GenericPairComparator;
 import org.apache.flink.api.common.typeutils.TypeComparator;
 import org.apache.flink.api.common.typeutils.TypePairComparator;
 import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
-import org.apache.flink.api.common.typeutils.record.RecordComparator;
-import org.apache.flink.api.common.typeutils.record.RecordPairComparator;
-import org.apache.flink.api.common.typeutils.record.RecordSerializerFactory;
-import org.apache.flink.api.java.record.functions.JoinFunction;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.core.memory.MemoryType;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
@@ -37,10 +36,8 @@ import 
org.apache.flink.runtime.operators.sort.UnilateralSortMerger;
 import org.apache.flink.runtime.operators.testutils.DiscardingOutputCollector;
 import org.apache.flink.runtime.operators.testutils.DummyInvokable;
 import org.apache.flink.runtime.operators.testutils.TestData;
-import org.apache.flink.runtime.operators.testutils.TestData.Generator;
-import org.apache.flink.runtime.operators.testutils.TestData.Generator.KeyMode;
-import 
org.apache.flink.runtime.operators.testutils.TestData.Generator.ValueMode;
-import org.apache.flink.types.Record;
+import 
org.apache.flink.runtime.operators.testutils.TestData.TupleGenerator.KeyMode;
+import 
org.apache.flink.runtime.operators.testutils.TestData.TupleGenerator.ValueMode;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.MutableObjectIterator;
 import org.junit.After;
@@ -78,21 +75,21 @@ public class HashVsSortMiniBenchmark {
        private IOManager ioManager;
        private MemoryManager memoryManager;
        
-       private TypeSerializerFactory<Record> serializer1;
-       private TypeSerializerFactory<Record> serializer2;
-       private TypeComparator<Record> comparator1;
-       private TypeComparator<Record> comparator2;
-       private TypePairComparator<Record, Record> pairComparator11;
+       private TypeSerializerFactory<Tuple2<Integer, String>> serializer1;
+       private TypeSerializerFactory<Tuple2<Integer, String>> serializer2;
+       private TypeComparator<Tuple2<Integer, String>> comparator1;
+       private TypeComparator<Tuple2<Integer, String>> comparator2;
+       private TypePairComparator<Tuple2<Integer, String>, Tuple2<Integer, 
String>> pairComparator11;
 
 
        @SuppressWarnings("unchecked")
        @Before
        public void beforeTest() {
-               this.serializer1 = RecordSerializerFactory.get();
-               this.serializer2 = RecordSerializerFactory.get();
-               this.comparator1 = new RecordComparator(new int[] {0}, new 
Class[] {TestData.Key.class});
-               this.comparator2 = new RecordComparator(new int[] {0}, new 
Class[] {TestData.Key.class});
-               this.pairComparator11 = new RecordPairComparator(new int[] {0}, 
new int[] {0}, new Class[] {TestData.Key.class});
+               this.serializer1 = 
TestData.getIntStringTupleSerializerFactory();
+               this.serializer2 = 
TestData.getIntStringTupleSerializerFactory();
+               this.comparator1 = TestData.getIntStringTupleComparator();
+               this.comparator2 = TestData.getIntStringTupleComparator();
+               this.pairComparator11 = new 
GenericPairComparator(this.comparator1, this.comparator2);
                
                this.memoryManager = new MemoryManager(MEMORY_SIZE, 1, 
PAGE_SIZE, MemoryType.HEAP, true);
                this.ioManager = new IOManagerAsync();
@@ -120,31 +117,31 @@ public class HashVsSortMiniBenchmark {
        public void testSortBothMerge() {
                try {
                        
-                       Generator generator1 = new Generator(SEED1, 
INPUT_1_SIZE / 10, 100, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
-                       Generator generator2 = new Generator(SEED2, 
INPUT_2_SIZE, 100, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
+                       TestData.TupleGenerator generator1 = new 
TestData.TupleGenerator(SEED1, INPUT_1_SIZE / 10, 100, KeyMode.RANDOM, 
ValueMode.RANDOM_LENGTH);
+                       TestData.TupleGenerator generator2 = new 
TestData.TupleGenerator(SEED2, INPUT_2_SIZE, 100, KeyMode.RANDOM, 
ValueMode.RANDOM_LENGTH);
 
-                       final TestData.GeneratorIterator input1 = new 
TestData.GeneratorIterator(generator1, INPUT_1_SIZE);
-                       final TestData.GeneratorIterator input2 = new 
TestData.GeneratorIterator(generator2, INPUT_2_SIZE);
+                       final TestData.TupleGeneratorIterator input1 = new 
TestData.TupleGeneratorIterator(generator1, INPUT_1_SIZE);
+                       final TestData.TupleGeneratorIterator input2 = new 
TestData.TupleGeneratorIterator(generator2, INPUT_2_SIZE);
                        
-                       final JoinFunction matcher = new NoOpMatcher();
-                       final Collector<Record> collector = new 
DiscardingOutputCollector<Record>();
+                       final FlatJoinFunction matcher = new NoOpMatcher();
+                       final Collector<Tuple2<Integer, String>> collector = 
new DiscardingOutputCollector<>();
                        
                        long start = System.nanoTime();
                        
-                       final UnilateralSortMerger<Record> sorter1 = new 
UnilateralSortMerger<Record>(
+                       final UnilateralSortMerger<Tuple2<Integer, String>> 
sorter1 = new UnilateralSortMerger<>(
                                        this.memoryManager, this.ioManager, 
input1, this.parentTask, this.serializer1, 
                                        this.comparator1.duplicate(), 
MEMORY_FOR_SORTER, 128, 0.8f, true);
                        
-                       final UnilateralSortMerger<Record> sorter2 = new 
UnilateralSortMerger<Record>(
+                       final UnilateralSortMerger<Tuple2<Integer, String>> 
sorter2 = new UnilateralSortMerger<>(
                                        this.memoryManager, this.ioManager, 
input2, this.parentTask, this.serializer2, 
                                        this.comparator2.duplicate(), 
MEMORY_FOR_SORTER, 128, 0.8f, true);
                        
-                       final MutableObjectIterator<Record> sortedInput1 = 
sorter1.getIterator();
-                       final MutableObjectIterator<Record> sortedInput2 = 
sorter2.getIterator();
+                       final MutableObjectIterator<Tuple2<Integer, String>> 
sortedInput1 = sorter1.getIterator();
+                       final MutableObjectIterator<Tuple2<Integer, String>> 
sortedInput2 = sorter2.getIterator();
                        
                        // compare with iterator values
-                       ReusingMergeInnerJoinIterator<Record, Record, Record> 
iterator =
-                               new ReusingMergeInnerJoinIterator<Record, 
Record, Record>(sortedInput1, sortedInput2,
+                       ReusingMergeInnerJoinIterator<Tuple2<Integer, String>, 
Tuple2<Integer, String>, Tuple2<Integer, String>> iterator =
+                               new 
ReusingMergeInnerJoinIterator<>(sortedInput1, sortedInput2,
                                                
this.serializer1.getSerializer(), this.comparator1, 
this.serializer2.getSerializer(), this.comparator2, this.pairComparator11,
                                                this.memoryManager, 
this.ioManager, MEMORY_PAGES_FOR_MERGE, this.parentTask);
                        
@@ -170,21 +167,21 @@ public class HashVsSortMiniBenchmark {
        @Test
        public void testBuildFirst() {
                try {
-                       Generator generator1 = new Generator(SEED1, 
INPUT_1_SIZE / 10, 100, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
-                       Generator generator2 = new Generator(SEED2, 
INPUT_2_SIZE, 100, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
+                       TestData.TupleGenerator generator1 = new 
TestData.TupleGenerator(SEED1, INPUT_1_SIZE / 10, 100, KeyMode.RANDOM, 
ValueMode.RANDOM_LENGTH);
+                       TestData.TupleGenerator generator2 = new 
TestData.TupleGenerator(SEED2, INPUT_2_SIZE, 100, KeyMode.RANDOM, 
ValueMode.RANDOM_LENGTH);
                        
-                       final TestData.GeneratorIterator input1 = new 
TestData.GeneratorIterator(generator1, INPUT_1_SIZE);
-                       final TestData.GeneratorIterator input2 = new 
TestData.GeneratorIterator(generator2, INPUT_2_SIZE);
+                       final TestData.TupleGeneratorIterator input1 = new 
TestData.TupleGeneratorIterator(generator1, INPUT_1_SIZE);
+                       final TestData.TupleGeneratorIterator input2 = new 
TestData.TupleGeneratorIterator(generator2, INPUT_2_SIZE);
                        
-                       final JoinFunction matcher = new NoOpMatcher();
+                       final FlatJoinFunction matcher = new NoOpMatcher();
                        
-                       final Collector<Record> collector = new 
DiscardingOutputCollector<Record>();
+                       final Collector<Tuple2<Integer, String>> collector = 
new DiscardingOutputCollector<>();
                        
                        long start = System.nanoTime();
                        
                        // compare with iterator values
-                       final ReusingBuildFirstHashMatchIterator<Record, 
Record, Record> iterator =
-                                       new 
ReusingBuildFirstHashMatchIterator<Record, Record, Record>(
+                       final 
ReusingBuildFirstHashMatchIterator<Tuple2<Integer, String>, Tuple2<Integer, 
String>, Tuple2<Integer, String>> iterator =
+                                       new 
ReusingBuildFirstHashMatchIterator<>(
                                                input1, input2, 
this.serializer1.getSerializer(), this.comparator1, 
                                                        
this.serializer2.getSerializer(), this.comparator2, this.pairComparator11,
                                                        this.memoryManager, 
this.ioManager, this.parentTask, MEMORY_SIZE, true);
@@ -209,21 +206,21 @@ public class HashVsSortMiniBenchmark {
        @Test
        public void testBuildSecond() {
                try {
-                       Generator generator1 = new Generator(SEED1, 
INPUT_1_SIZE / 10, 100, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
-                       Generator generator2 = new Generator(SEED2, 
INPUT_2_SIZE, 100, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
+                       TestData.TupleGenerator generator1 = new 
TestData.TupleGenerator(SEED1, INPUT_1_SIZE / 10, 100, KeyMode.RANDOM, 
ValueMode.RANDOM_LENGTH);
+                       TestData.TupleGenerator generator2 = new 
TestData.TupleGenerator(SEED2, INPUT_2_SIZE, 100, KeyMode.RANDOM, 
ValueMode.RANDOM_LENGTH);
                        
-                       final TestData.GeneratorIterator input1 = new 
TestData.GeneratorIterator(generator1, INPUT_1_SIZE);
-                       final TestData.GeneratorIterator input2 = new 
TestData.GeneratorIterator(generator2, INPUT_2_SIZE);
+                       final TestData.TupleGeneratorIterator input1 = new 
TestData.TupleGeneratorIterator(generator1, INPUT_1_SIZE);
+                       final TestData.TupleGeneratorIterator input2 = new 
TestData.TupleGeneratorIterator(generator2, INPUT_2_SIZE);
                        
-                       final JoinFunction matcher = new NoOpMatcher();
+                       final FlatJoinFunction matcher = new NoOpMatcher();
                        
-                       final Collector<Record> collector = new 
DiscardingOutputCollector<Record>();
+                       final Collector<Tuple2<Integer, String>> collector = 
new DiscardingOutputCollector<>();
                        
                        long start = System.nanoTime();
                        
                        // compare with iterator values
-                       ReusingBuildSecondHashMatchIterator<Record, Record, 
Record> iterator =
-                                       new 
ReusingBuildSecondHashMatchIterator<Record, Record, Record>(
+                       ReusingBuildSecondHashMatchIterator<Tuple2<Integer, 
String>, Tuple2<Integer, String>, Tuple2<Integer, String>> iterator =
+                                       new 
ReusingBuildSecondHashMatchIterator<>(
                                                input1, input2, 
this.serializer1.getSerializer(), this.comparator1, 
                                                
this.serializer2.getSerializer(), this.comparator2, this.pairComparator11,
                                                this.memoryManager, 
this.ioManager, this.parentTask, MEMORY_SIZE, true);
@@ -246,11 +243,11 @@ public class HashVsSortMiniBenchmark {
        }
        
        
-       private static final class NoOpMatcher extends JoinFunction {
+       private static final class NoOpMatcher implements 
FlatJoinFunction<Tuple2<Integer, String>, Tuple2<Integer, String>, 
Tuple2<Integer, String>> {
                private static final long serialVersionUID = 1L;
                
                @Override
-               public void join(Record rec1, Record rec2, Collector<Record> 
out) throws Exception {
+               public void join(Tuple2<Integer, String> rec1, Tuple2<Integer, 
String> rec2, Collector<Tuple2<Integer, String>> out) throws Exception {
                }
        }
 }

Reply via email to