[FLINK-2105] Add support for sorted but sparse test data generation
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/db0b0087 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/db0b0087 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/db0b0087 Branch: refs/heads/master Commit: db0b0087b02985f55bcc6e65571b11ca33b0886f Parents: 0dc6849 Author: Johann Kovacs <m...@jkovacs.de> Authored: Fri Jul 10 17:26:05 2015 +0200 Committer: Fabian Hueske <fhue...@apache.org> Committed: Tue Aug 4 21:35:27 2015 +0200 ---------------------------------------------------------------------- .../runtime/operators/testutils/TestData.java | 207 +++++++++++++++++++ 1 file changed, 207 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/db0b0087/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/TestData.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/TestData.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/TestData.java index fd34a3b..8688d4e 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/TestData.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/TestData.java @@ -21,6 +21,8 @@ package org.apache.flink.runtime.operators.testutils; import java.util.Comparator; import java.util.Random; +import org.apache.flink.api.java.tuple.Tuple; +import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.types.IntValue; import org.apache.flink.types.Record; import org.apache.flink.types.StringValue; @@ -279,6 +281,169 @@ public final class TestData { this.counter = 0; } } + + /** + * Tuple2<Integer, String> generator. + */ + public static class TupleGenerator implements MutableObjectIterator<Tuple2<Integer, String>> { + + public enum KeyMode { + SORTED, RANDOM, SORTED_SPARSE + }; + + public enum ValueMode { + FIX_LENGTH, RANDOM_LENGTH, CONSTANT + }; + + private static char[] alpha = { 'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm', 'a', 'b', 'c', + 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm' }; + + private final long seed; + + private final int keyMax; + + private final float keyDensity; + + private final int valueLength; + + private final KeyMode keyMode; + + private final ValueMode valueMode; + + private Random random; + + private int counter; + + private int key; + private String value; + + public TupleGenerator(long seed, int keyMax, int valueLength) { + this(seed, keyMax, valueLength, KeyMode.RANDOM, ValueMode.FIX_LENGTH); + } + + public TupleGenerator(long seed, int keyMax, int valueLength, KeyMode keyMode, ValueMode valueMode) { + this(seed, keyMax, valueLength, keyMode, valueMode, null); + } + + public TupleGenerator(long seed, int keyMax, int valueLength, KeyMode keyMode, ValueMode valueMode, String constant) { + this(seed, keyMax, 1.0f, valueLength, keyMode, valueMode, constant); + } + + public TupleGenerator(long seed, int keyMax, float keyDensity, int valueLength, KeyMode keyMode, ValueMode valueMode, String constant) { + this.seed = seed; + this.keyMax = keyMax; + this.keyDensity = keyDensity; + this.valueLength = valueLength; + this.keyMode = keyMode; + this.valueMode = valueMode; + + this.random = new Random(seed); + this.counter = 0; + + this.value = constant == null ? null : constant; + } + + public Tuple2<Integer, String> next(Tuple2<Integer, String> reuse) { + this.key = nextKey(); + if (this.valueMode != ValueMode.CONSTANT) { + this.value = randomString(); + } + reuse.setFields(this.key, this.value); + return reuse; + } + + public Tuple2<Integer, String> next() { + return next(new Tuple2<Integer, String>()); + } + + public boolean next(org.apache.flink.types.Value[] target) { + this.key = nextKey(); + // TODO change this to something proper + ((IntValue)target[0]).setValue(this.key); + ((IntValue)target[1]).setValue(random.nextInt()); + return true; + } + + private int nextKey() { + if (keyMode == KeyMode.SORTED) { + return ++counter; + } else if (keyMode == KeyMode.SORTED_SPARSE) { + int max = (int) (1 / keyDensity); + counter += random.nextInt(max) + 1; + return counter; + } else { + return Math.abs(random.nextInt() % keyMax) + 1; + } + } + + public void reset() { + this.random = new Random(seed); + this.counter = 0; + } + + private String randomString() { + int length; + + if (valueMode == ValueMode.FIX_LENGTH) { + length = valueLength; + } else { + length = valueLength - random.nextInt(valueLength / 3); + } + + StringBuilder sb = new StringBuilder(); + for (int i = 0; i < length; i++) { + sb.append(alpha[random.nextInt(alpha.length)]); + } + return sb.toString(); + } + + } + + + /** + * Record reader mock. + */ + public static class TupleGeneratorIterator implements MutableObjectIterator<Tuple2<Integer, String>> { + + private final TupleGenerator generator; + + private final int numberOfRecords; + + private int counter; + + public TupleGeneratorIterator(TupleGenerator generator, int numberOfRecords) { + this.generator = generator; + this.generator.reset(); + this.numberOfRecords = numberOfRecords; + this.counter = 0; + } + + @Override + public Tuple2<Integer, String> next(Tuple2<Integer, String> target) { + if (counter < numberOfRecords) { + counter++; + return generator.next(target); + } + else { + return null; + } + } + + @Override + public Tuple2<Integer, String> next() { + if (counter < numberOfRecords) { + counter++; + return generator.next(); + } + else { + return null; + } + } + + public void reset() { + this.counter = 0; + } + } // -------------------------------------------------------------------------------------------- @@ -325,4 +490,46 @@ public final class TestData { this.pos = 0; } } + + public static class TupleConstantValueIterator implements MutableObjectIterator<Tuple2<Integer, String>> { + + private int key; + private String value; + + private final String valueValue; + + + private final int numPairs; + + private int pos; + + + public TupleConstantValueIterator(int keyValue, String valueValue, int numPairs) { + this.key = keyValue; + this.valueValue = valueValue; + this.numPairs = numPairs; + } + + @Override + public Tuple2<Integer, String> next(Tuple2<Integer, String> reuse) { + if (pos < this.numPairs) { + this.value = this.valueValue + ' ' + pos; + reuse.setFields(this.key, this.value); + pos++; + return reuse; + } + else { + return null; + } + } + + @Override + public Tuple2<Integer, String> next() { + return next(new Tuple2<Integer, String>()); + } + + public void reset() { + this.pos = 0; + } + } }