http://git-wip-us.apache.org/repos/asf/flink/blob/fbc18b96/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/TestData.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/TestData.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/TestData.java index 2427edd..d12307a 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/TestData.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/TestData.java @@ -18,13 +18,24 @@ package org.apache.flink.runtime.operators.testutils; -import java.util.Comparator; +import java.io.IOException; import java.util.Random; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import org.apache.flink.api.common.typeutils.TypeComparator; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.TypeSerializerFactory; +import org.apache.flink.api.common.typeutils.base.IntComparator; +import org.apache.flink.api.java.tuple.Tuple; import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.runtime.operators.testutils.types.IntPair; import org.apache.flink.types.IntValue; -import org.apache.flink.types.Record; -import org.apache.flink.types.StringValue; import org.apache.flink.util.MutableObjectIterator; /** @@ -38,250 +49,6 @@ public final class TestData { private TestData() {} /** - * Key comparator. - */ - public static class KeyComparator implements Comparator<Key> { - @Override - public int compare(Key k1, Key k2) { - return k1.compareTo(k2); - } - }; - - /** - * Key implementation. - */ - public static class Key extends IntValue { - private static final long serialVersionUID = 1L; - - public Key() { - super(); - } - - public Key(int k) { - super(k); - } - - public int getKey() { - return getValue(); - } - - public void setKey(int key) { - setValue(key); - } - } - - /** - * Value implementation. - */ - public static class Value extends StringValue { - - private static final long serialVersionUID = 1L; - - public Value() { - super(); - } - - public Value(String v) { - super(v); - } - - @Override - public boolean equals(final Object obj) { - if (this == obj) { - return true; - } - - if (obj.getClass() == TestData.Value.class) { - final StringValue other = (StringValue) obj; - int len = this.length(); - - if (len == other.length()) { - final char[] tc = this.getCharArray(); - final char[] oc = other.getCharArray(); - int i = 0, j = 0; - - while (len-- != 0) { - if (tc[i++] != oc[j++]) { - return false; - } - } - return true; - } - } - return false; - } - } - - /** - * Pair generator. - */ - public static class Generator implements MutableObjectIterator<Record> { - - public enum KeyMode { - SORTED, RANDOM - }; - - public enum ValueMode { - FIX_LENGTH, RANDOM_LENGTH, CONSTANT - }; - - private static char[] alpha = { 'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm', 'a', 'b', 'c', - 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm' }; - - private final long seed; - - private final int keyMax; - - private final int valueLength; - - private final KeyMode keyMode; - - private final ValueMode valueMode; - - private Random random; - - private int counter; - - private Key key; - private Value value; - - public Generator(long seed, int keyMax, int valueLength) { - this(seed, keyMax, valueLength, KeyMode.RANDOM, ValueMode.FIX_LENGTH); - } - - public Generator(long seed, int keyMax, int valueLength, KeyMode keyMode, ValueMode valueMode) { - this(seed, keyMax, valueLength, keyMode, valueMode, null); - } - - public Generator(long seed, int keyMax, int valueLength, KeyMode keyMode, ValueMode valueMode, Value constant) { - this.seed = seed; - this.keyMax = keyMax; - this.valueLength = valueLength; - this.keyMode = keyMode; - this.valueMode = valueMode; - - this.random = new Random(seed); - this.counter = 0; - - this.key = new Key(); - this.value = constant == null ? new Value() : constant; - } - - public Record next(Record reuse) { - this.key.setKey(keyMode == KeyMode.SORTED ? ++counter : Math.abs(random.nextInt() % keyMax) + 1); - if (this.valueMode != ValueMode.CONSTANT) { - this.value.setValue(randomString()); - } - reuse.setField(0, this.key); - reuse.setField(1, this.value); - return reuse; - } - - public Record next() { - return next(new Record(2)); - } - - public boolean next(org.apache.flink.types.Value[] target) { - this.key.setKey(keyMode == KeyMode.SORTED ? ++counter : Math.abs(random.nextInt() % keyMax) + 1); - // TODO change this to something proper - ((IntValue)target[0]).setValue(this.key.getValue()); - ((IntValue)target[1]).setValue(random.nextInt()); - return true; - } - - public int sizeOf(Record rec) { - // key - int valueLength = Integer.SIZE / 8; - - // value - String text = rec.getField(1, Value.class).getValue(); - int strlen = text.length(); - int utflen = 0; - int c; - for (int i = 0; i < strlen; i++) { - c = text.charAt(i); - if ((c >= 0x0001) && (c <= 0x007F)) { - utflen++; - } else if (c > 0x07FF) { - utflen += 3; - } else { - utflen += 2; - } - } - valueLength += 2 + utflen; - - return valueLength; - } - - public void reset() { - this.random = new Random(seed); - this.counter = 0; - } - - private String randomString() { - int length; - - if (valueMode == ValueMode.FIX_LENGTH) { - length = valueLength; - } else { - length = valueLength - random.nextInt(valueLength / 3); - } - - StringBuilder sb = new StringBuilder(); - for (int i = 0; i < length; i++) { - sb.append(alpha[random.nextInt(alpha.length)]); - } - return sb.toString(); - } - - } - - /** - * Record reader mock. - */ - public static class GeneratorIterator implements MutableObjectIterator<Record> { - - private final Generator generator; - - private final int numberOfRecords; - - private int counter; - - public GeneratorIterator(Generator generator, int numberOfRecords) { - this.generator = generator; - this.generator.reset(); - this.numberOfRecords = numberOfRecords; - this.counter = 0; - } - - @Override - public Record next(Record target) { - if (counter < numberOfRecords) { - counter++; - return generator.next(target); - } - else { - return null; - } - } - - @Override - public Record next() { - if (counter < numberOfRecords) { - counter++; - return generator.next(); - } - else { - return null; - } - } - - public void reset() { - this.counter = 0; - } - } - - /** * Tuple2<Integer, String> generator. */ public static class TupleGenerator implements MutableObjectIterator<Tuple2<Integer, String>> { @@ -398,9 +165,8 @@ public final class TestData { } - /** - * Record reader mock. + * Tuple reader mock. */ public static class TupleGeneratorIterator implements MutableObjectIterator<Tuple2<Integer, String>> { @@ -443,35 +209,31 @@ public final class TestData { this.counter = 0; } } - - // -------------------------------------------------------------------------------------------- - - public static class ConstantValueIterator implements MutableObjectIterator<Record> { - - private final Key key; - private final Value value; - + + public static class TupleConstantValueIterator implements MutableObjectIterator<Tuple2<Integer, String>> { + + private int key; + private String value; + private final String valueValue; - - + + private final int numPairs; - + private int pos; - - - public ConstantValueIterator(int keyValue, String valueValue, int numPairs) { - this.key = new Key(keyValue); - this.value = new Value(); + + + public TupleConstantValueIterator(int keyValue, String valueValue, int numPairs) { + this.key = keyValue; this.valueValue = valueValue; this.numPairs = numPairs; } - + @Override - public Record next(Record reuse) { + public Tuple2<Integer, String> next(Tuple2<Integer, String> reuse) { if (pos < this.numPairs) { - this.value.setValue(this.valueValue + ' ' + pos); - reuse.setField(0, this.key); - reuse.setField(1, this.value); + this.value = this.valueValue + ' ' + pos; + reuse.setFields(this.key, this.value); pos++; return reuse; } @@ -481,8 +243,8 @@ public final class TestData { } @Override - public Record next() { - return next(new Record(2)); + public Tuple2<Integer, String> next() { + return next(new Tuple2<Integer, String>()); } public void reset() { @@ -490,45 +252,307 @@ public final class TestData { } } - public static class TupleConstantValueIterator implements MutableObjectIterator<Tuple2<Integer, String>> { + /** + * An iterator that returns the Key/Value pairs with identical value a given number of times. + */ + public static final class ConstantIntIntTuplesIterator implements MutableObjectIterator<Tuple2<Integer, Integer>> { - private int key; - private String value; + private final int key; + private final int value; - private final String valueValue; + private int numLeft; + public ConstantIntIntTuplesIterator(int key, int value, int count) { + this.key = key; + this.value = value; + this.numLeft = count; + } - private final int numPairs; + @Override + public Tuple2<Integer, Integer> next(Tuple2<Integer, Integer> reuse) { + if (this.numLeft > 0) { + this.numLeft--; + reuse.setField(this.key, 0); + reuse.setField(this.value, 1); + return reuse; + } else { + return null; + } + } - private int pos; + @Override + public Tuple2<Integer, Integer> next() { + return next(new Tuple2<>(0, 0)); + } + } + //----Tuple2<Integer, String> + private static final TupleTypeInfo<Tuple2<Integer, String>> typeInfoIntString = TupleTypeInfo.getBasicTupleTypeInfo(Integer.class, String.class); - public TupleConstantValueIterator(int keyValue, String valueValue, int numPairs) { - this.key = keyValue; - this.valueValue = valueValue; - this.numPairs = numPairs; + private static final TypeSerializerFactory<Tuple2<Integer, String>> serializerFactoryIntString = new MockTupleSerializerFactory(typeInfoIntString); + + public static TupleTypeInfo<Tuple2<Integer, String>> getIntStringTupleTypeInfo() { + return typeInfoIntString; + } + + public static TypeSerializerFactory<Tuple2<Integer, String>> getIntStringTupleSerializerFactory() { + return serializerFactoryIntString; + } + + public static TypeSerializer<Tuple2<Integer, String>> getIntStringTupleSerializer() { + return serializerFactoryIntString.getSerializer(); + } + + public static TypeComparator<Tuple2<Integer, String>> getIntStringTupleComparator() { + return getIntStringTupleTypeInfo().createComparator(new int[]{0}, new boolean[]{true}, 0, null); + } + + public static MockTuple2Reader<Tuple2<Integer, String>> getIntStringTupleReader() { + return new MockTuple2Reader<Tuple2<Integer, String>>(); + } + + //----Tuple2<Integer, Integer> + private static final TupleTypeInfo<Tuple2<Integer, Integer>> typeInfoIntInt = TupleTypeInfo.getBasicTupleTypeInfo(Integer.class, Integer.class); + + private static final TypeSerializerFactory<Tuple2<Integer, Integer>> serializerFactoryIntInt = new MockTupleSerializerFactory(typeInfoIntInt); + + public static TupleTypeInfo<Tuple2<Integer, Integer>> getIntIntTupleTypeInfo() { + return typeInfoIntInt; + } + + public static TypeSerializerFactory<Tuple2<Integer, Integer>> getIntIntTupleSerializerFactory() { + return serializerFactoryIntInt; + } + + public static TypeSerializer<Tuple2<Integer, Integer>> getIntIntTupleSerializer() { + return getIntIntTupleSerializerFactory().getSerializer(); + } + + public static TypeComparator<Tuple2<Integer, Integer>> getIntIntTupleComparator() { + return getIntIntTupleTypeInfo().createComparator(new int[]{0}, new boolean[]{true}, 0, null); + } + + public static MockTuple2Reader<Tuple2<Integer, Integer>> getIntIntTupleReader() { + return new MockTuple2Reader<>(); + } + + //----Tuple2<?, ?> + private static class MockTupleSerializerFactory<T extends Tuple> implements TypeSerializerFactory<T> { + private final TupleTypeInfo<T> info; + + public MockTupleSerializerFactory(TupleTypeInfo<T> info) { + this.info = info; } @Override - public Tuple2<Integer, String> next(Tuple2<Integer, String> reuse) { - if (pos < this.numPairs) { - this.value = this.valueValue + ' ' + pos; - reuse.setFields(this.key, this.value); - pos++; + public void writeParametersToConfig(Configuration config) { + throw new UnsupportedOperationException("Not supported yet."); + } + + @Override + public void readParametersFromConfig(Configuration config, ClassLoader cl) throws ClassNotFoundException { + throw new UnsupportedOperationException("Not supported yet."); + } + + @Override + public TypeSerializer<T> getSerializer() { + return info.createSerializer(null); + } + + @Override + public Class<T> getDataType() { + return info.getTypeClass(); + } + } + + public static class MockTuple2Reader<T extends Tuple2> implements MutableObjectIterator<T> { + private final Tuple2 SENTINEL = new Tuple2(); + + private final BlockingQueue<Tuple2> queue; + + public MockTuple2Reader() { + this.queue = new ArrayBlockingQueue<Tuple2>(32, false); + } + + public MockTuple2Reader(int size) { + this.queue = new ArrayBlockingQueue<Tuple2>(size, false); + } + + @Override + public T next(T reuse) { + Tuple2 r = null; + while (r == null) { + try { + r = queue.take(); + } catch (InterruptedException iex) { + throw new RuntimeException("Reader was interrupted."); + } + } + + if (r.equals(SENTINEL)) { + // put the sentinel back, to ensure that repeated calls do not block + try { + queue.put(r); + } catch (InterruptedException e) { + throw new RuntimeException("Reader was interrupted."); + } + return null; + } else { + reuse.setField(r.getField(0), 0); + reuse.setField(r.getField(1), 1); return reuse; } - else { + } + + @Override + public T next() { + Tuple2 r = null; + while (r == null) { + try { + r = queue.take(); + } catch (InterruptedException iex) { + throw new RuntimeException("Reader was interrupted."); + } + } + + if (r.equals(SENTINEL)) { + // put the sentinel back, to ensure that repeated calls do not block + try { + queue.put(r); + } catch (InterruptedException e) { + throw new RuntimeException("Reader was interrupted."); + } return null; + } else { + Tuple2 result = new Tuple2(r.f0, r.f1); + return (T) result; + } + } + + public void emit(Tuple2 element) throws InterruptedException { + queue.put(new Tuple2(element.f0, element.f1)); + } + + public void close() { + try { + queue.put(SENTINEL); + } catch (InterruptedException e) { + throw new RuntimeException(e); } } + } + + public static class IntPairComparator extends TypeComparator<IntPair> { + + private static final long serialVersionUID = 1L; + + private int reference; + + private final TypeComparator[] comparators = new TypeComparator[]{new IntComparator(true)}; @Override - public Tuple2<Integer, String> next() { - return next(new Tuple2<Integer, String>()); + public int hash(IntPair object) { + return comparators[0].hash(object.getKey()); } - public void reset() { - this.pos = 0; + @Override + public void setReference(IntPair toCompare) { + this.reference = toCompare.getKey(); + } + + @Override + public boolean equalToReference(IntPair candidate) { + return candidate.getKey() == this.reference; + } + + @Override + public int compareToReference(TypeComparator<IntPair> referencedAccessors) { + final IntPairComparator comp = (IntPairComparator) referencedAccessors; + return comp.reference - this.reference; + } + + @Override + public int compare(IntPair first, IntPair second) { + return first.getKey() - second.getKey(); + } + + @Override + public int compareSerialized(DataInputView source1, DataInputView source2) throws IOException { + return source1.readInt() - source2.readInt(); + } + + @Override + public boolean supportsNormalizedKey() { + return true; + } + + @Override + public int getNormalizeKeyLen() { + return 4; + } + + @Override + public boolean isNormalizedKeyPrefixOnly(int keyBytes) { + return keyBytes < 4; + } + + @Override + public void putNormalizedKey(IntPair record, MemorySegment target, int offset, int len) { + // see IntValue for a documentation of the logic + final int value = record.getKey() - Integer.MIN_VALUE; + + if (len == 4) { + target.putIntBigEndian(offset, value); + } else if (len <= 0) { + } else if (len < 4) { + for (int i = 0; len > 0; len--, i++) { + target.put(offset + i, (byte) ((value >>> ((3 - i) << 3)) & 0xff)); + } + } else { + target.putIntBigEndian(offset, value); + for (int i = 4; i < len; i++) { + target.put(offset + i, (byte) 0); + } + } + } + + @Override + public boolean invertNormalizedKey() { + return false; + } + + @Override + public IntPairComparator duplicate() { + return new IntPairComparator(); + } + + @Override + public int extractKeys(Object record, Object[] target, int index) { + target[index] = ((IntPair) record).getKey(); + return 1; + } + + @Override + public TypeComparator[] getFlatComparators() { + return comparators; + } + + @Override + public boolean supportsSerializationWithKeyNormalization() { + return true; + } + + @Override + public void writeWithKeyNormalization(IntPair record, DataOutputView target) throws IOException { + target.writeInt(record.getKey() - Integer.MIN_VALUE); + target.writeInt(record.getValue()); + } + + @Override + public IntPair readWithKeyDenormalization(IntPair reuse, DataInputView source) throws IOException { + reuse.setKey(source.readInt() + Integer.MIN_VALUE); + reuse.setValue(source.readInt()); + return reuse; } } }
http://git-wip-us.apache.org/repos/asf/flink/blob/fbc18b96/flink-runtime/src/test/java/org/apache/flink/runtime/operators/util/HashVsSortMiniBenchmark.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/util/HashVsSortMiniBenchmark.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/util/HashVsSortMiniBenchmark.java index f112ff8..4c5a07e 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/util/HashVsSortMiniBenchmark.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/util/HashVsSortMiniBenchmark.java @@ -18,13 +18,12 @@ package org.apache.flink.runtime.operators.util; +import org.apache.flink.api.common.functions.FlatJoinFunction; +import org.apache.flink.api.common.typeutils.GenericPairComparator; import org.apache.flink.api.common.typeutils.TypeComparator; import org.apache.flink.api.common.typeutils.TypePairComparator; import org.apache.flink.api.common.typeutils.TypeSerializerFactory; -import org.apache.flink.api.common.typeutils.record.RecordComparator; -import org.apache.flink.api.common.typeutils.record.RecordPairComparator; -import org.apache.flink.api.common.typeutils.record.RecordSerializerFactory; -import org.apache.flink.api.java.record.functions.JoinFunction; +import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.core.memory.MemoryType; import org.apache.flink.runtime.io.disk.iomanager.IOManager; import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync; @@ -37,10 +36,8 @@ import org.apache.flink.runtime.operators.sort.UnilateralSortMerger; import org.apache.flink.runtime.operators.testutils.DiscardingOutputCollector; import org.apache.flink.runtime.operators.testutils.DummyInvokable; import org.apache.flink.runtime.operators.testutils.TestData; -import org.apache.flink.runtime.operators.testutils.TestData.Generator; -import org.apache.flink.runtime.operators.testutils.TestData.Generator.KeyMode; -import org.apache.flink.runtime.operators.testutils.TestData.Generator.ValueMode; -import org.apache.flink.types.Record; +import org.apache.flink.runtime.operators.testutils.TestData.TupleGenerator.KeyMode; +import org.apache.flink.runtime.operators.testutils.TestData.TupleGenerator.ValueMode; import org.apache.flink.util.Collector; import org.apache.flink.util.MutableObjectIterator; import org.junit.After; @@ -78,21 +75,21 @@ public class HashVsSortMiniBenchmark { private IOManager ioManager; private MemoryManager memoryManager; - private TypeSerializerFactory<Record> serializer1; - private TypeSerializerFactory<Record> serializer2; - private TypeComparator<Record> comparator1; - private TypeComparator<Record> comparator2; - private TypePairComparator<Record, Record> pairComparator11; + private TypeSerializerFactory<Tuple2<Integer, String>> serializer1; + private TypeSerializerFactory<Tuple2<Integer, String>> serializer2; + private TypeComparator<Tuple2<Integer, String>> comparator1; + private TypeComparator<Tuple2<Integer, String>> comparator2; + private TypePairComparator<Tuple2<Integer, String>, Tuple2<Integer, String>> pairComparator11; @SuppressWarnings("unchecked") @Before public void beforeTest() { - this.serializer1 = RecordSerializerFactory.get(); - this.serializer2 = RecordSerializerFactory.get(); - this.comparator1 = new RecordComparator(new int[] {0}, new Class[] {TestData.Key.class}); - this.comparator2 = new RecordComparator(new int[] {0}, new Class[] {TestData.Key.class}); - this.pairComparator11 = new RecordPairComparator(new int[] {0}, new int[] {0}, new Class[] {TestData.Key.class}); + this.serializer1 = TestData.getIntStringTupleSerializerFactory(); + this.serializer2 = TestData.getIntStringTupleSerializerFactory(); + this.comparator1 = TestData.getIntStringTupleComparator(); + this.comparator2 = TestData.getIntStringTupleComparator(); + this.pairComparator11 = new GenericPairComparator(this.comparator1, this.comparator2); this.memoryManager = new MemoryManager(MEMORY_SIZE, 1, PAGE_SIZE, MemoryType.HEAP, true); this.ioManager = new IOManagerAsync(); @@ -120,31 +117,31 @@ public class HashVsSortMiniBenchmark { public void testSortBothMerge() { try { - Generator generator1 = new Generator(SEED1, INPUT_1_SIZE / 10, 100, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH); - Generator generator2 = new Generator(SEED2, INPUT_2_SIZE, 100, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH); + TestData.TupleGenerator generator1 = new TestData.TupleGenerator(SEED1, INPUT_1_SIZE / 10, 100, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH); + TestData.TupleGenerator generator2 = new TestData.TupleGenerator(SEED2, INPUT_2_SIZE, 100, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH); - final TestData.GeneratorIterator input1 = new TestData.GeneratorIterator(generator1, INPUT_1_SIZE); - final TestData.GeneratorIterator input2 = new TestData.GeneratorIterator(generator2, INPUT_2_SIZE); + final TestData.TupleGeneratorIterator input1 = new TestData.TupleGeneratorIterator(generator1, INPUT_1_SIZE); + final TestData.TupleGeneratorIterator input2 = new TestData.TupleGeneratorIterator(generator2, INPUT_2_SIZE); - final JoinFunction matcher = new NoOpMatcher(); - final Collector<Record> collector = new DiscardingOutputCollector<Record>(); + final FlatJoinFunction matcher = new NoOpMatcher(); + final Collector<Tuple2<Integer, String>> collector = new DiscardingOutputCollector<>(); long start = System.nanoTime(); - final UnilateralSortMerger<Record> sorter1 = new UnilateralSortMerger<Record>( + final UnilateralSortMerger<Tuple2<Integer, String>> sorter1 = new UnilateralSortMerger<>( this.memoryManager, this.ioManager, input1, this.parentTask, this.serializer1, this.comparator1.duplicate(), MEMORY_FOR_SORTER, 128, 0.8f, true); - final UnilateralSortMerger<Record> sorter2 = new UnilateralSortMerger<Record>( + final UnilateralSortMerger<Tuple2<Integer, String>> sorter2 = new UnilateralSortMerger<>( this.memoryManager, this.ioManager, input2, this.parentTask, this.serializer2, this.comparator2.duplicate(), MEMORY_FOR_SORTER, 128, 0.8f, true); - final MutableObjectIterator<Record> sortedInput1 = sorter1.getIterator(); - final MutableObjectIterator<Record> sortedInput2 = sorter2.getIterator(); + final MutableObjectIterator<Tuple2<Integer, String>> sortedInput1 = sorter1.getIterator(); + final MutableObjectIterator<Tuple2<Integer, String>> sortedInput2 = sorter2.getIterator(); // compare with iterator values - ReusingMergeInnerJoinIterator<Record, Record, Record> iterator = - new ReusingMergeInnerJoinIterator<Record, Record, Record>(sortedInput1, sortedInput2, + ReusingMergeInnerJoinIterator<Tuple2<Integer, String>, Tuple2<Integer, String>, Tuple2<Integer, String>> iterator = + new ReusingMergeInnerJoinIterator<>(sortedInput1, sortedInput2, this.serializer1.getSerializer(), this.comparator1, this.serializer2.getSerializer(), this.comparator2, this.pairComparator11, this.memoryManager, this.ioManager, MEMORY_PAGES_FOR_MERGE, this.parentTask); @@ -170,21 +167,21 @@ public class HashVsSortMiniBenchmark { @Test public void testBuildFirst() { try { - Generator generator1 = new Generator(SEED1, INPUT_1_SIZE / 10, 100, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH); - Generator generator2 = new Generator(SEED2, INPUT_2_SIZE, 100, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH); + TestData.TupleGenerator generator1 = new TestData.TupleGenerator(SEED1, INPUT_1_SIZE / 10, 100, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH); + TestData.TupleGenerator generator2 = new TestData.TupleGenerator(SEED2, INPUT_2_SIZE, 100, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH); - final TestData.GeneratorIterator input1 = new TestData.GeneratorIterator(generator1, INPUT_1_SIZE); - final TestData.GeneratorIterator input2 = new TestData.GeneratorIterator(generator2, INPUT_2_SIZE); + final TestData.TupleGeneratorIterator input1 = new TestData.TupleGeneratorIterator(generator1, INPUT_1_SIZE); + final TestData.TupleGeneratorIterator input2 = new TestData.TupleGeneratorIterator(generator2, INPUT_2_SIZE); - final JoinFunction matcher = new NoOpMatcher(); + final FlatJoinFunction matcher = new NoOpMatcher(); - final Collector<Record> collector = new DiscardingOutputCollector<Record>(); + final Collector<Tuple2<Integer, String>> collector = new DiscardingOutputCollector<>(); long start = System.nanoTime(); // compare with iterator values - final ReusingBuildFirstHashMatchIterator<Record, Record, Record> iterator = - new ReusingBuildFirstHashMatchIterator<Record, Record, Record>( + final ReusingBuildFirstHashMatchIterator<Tuple2<Integer, String>, Tuple2<Integer, String>, Tuple2<Integer, String>> iterator = + new ReusingBuildFirstHashMatchIterator<>( input1, input2, this.serializer1.getSerializer(), this.comparator1, this.serializer2.getSerializer(), this.comparator2, this.pairComparator11, this.memoryManager, this.ioManager, this.parentTask, MEMORY_SIZE, true); @@ -209,21 +206,21 @@ public class HashVsSortMiniBenchmark { @Test public void testBuildSecond() { try { - Generator generator1 = new Generator(SEED1, INPUT_1_SIZE / 10, 100, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH); - Generator generator2 = new Generator(SEED2, INPUT_2_SIZE, 100, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH); + TestData.TupleGenerator generator1 = new TestData.TupleGenerator(SEED1, INPUT_1_SIZE / 10, 100, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH); + TestData.TupleGenerator generator2 = new TestData.TupleGenerator(SEED2, INPUT_2_SIZE, 100, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH); - final TestData.GeneratorIterator input1 = new TestData.GeneratorIterator(generator1, INPUT_1_SIZE); - final TestData.GeneratorIterator input2 = new TestData.GeneratorIterator(generator2, INPUT_2_SIZE); + final TestData.TupleGeneratorIterator input1 = new TestData.TupleGeneratorIterator(generator1, INPUT_1_SIZE); + final TestData.TupleGeneratorIterator input2 = new TestData.TupleGeneratorIterator(generator2, INPUT_2_SIZE); - final JoinFunction matcher = new NoOpMatcher(); + final FlatJoinFunction matcher = new NoOpMatcher(); - final Collector<Record> collector = new DiscardingOutputCollector<Record>(); + final Collector<Tuple2<Integer, String>> collector = new DiscardingOutputCollector<>(); long start = System.nanoTime(); // compare with iterator values - ReusingBuildSecondHashMatchIterator<Record, Record, Record> iterator = - new ReusingBuildSecondHashMatchIterator<Record, Record, Record>( + ReusingBuildSecondHashMatchIterator<Tuple2<Integer, String>, Tuple2<Integer, String>, Tuple2<Integer, String>> iterator = + new ReusingBuildSecondHashMatchIterator<>( input1, input2, this.serializer1.getSerializer(), this.comparator1, this.serializer2.getSerializer(), this.comparator2, this.pairComparator11, this.memoryManager, this.ioManager, this.parentTask, MEMORY_SIZE, true); @@ -246,11 +243,11 @@ public class HashVsSortMiniBenchmark { } - private static final class NoOpMatcher extends JoinFunction { + private static final class NoOpMatcher implements FlatJoinFunction<Tuple2<Integer, String>, Tuple2<Integer, String>, Tuple2<Integer, String>> { private static final long serialVersionUID = 1L; @Override - public void join(Record rec1, Record rec2, Collector<Record> out) throws Exception { + public void join(Tuple2<Integer, String> rec1, Tuple2<Integer, String> rec2, Collector<Tuple2<Integer, String>> out) throws Exception { } } }