[FLINK-2105] Add support for sorted but sparse test data generation

Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/db0b0087
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/db0b0087
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/db0b0087

Branch: refs/heads/master
Commit: db0b0087b02985f55bcc6e65571b11ca33b0886f
Parents: 0dc6849
Author: Johann Kovacs <m...@jkovacs.de>
Authored: Fri Jul 10 17:26:05 2015 +0200
Committer: Fabian Hueske <fhue...@apache.org>
Committed: Tue Aug 4 21:35:27 2015 +0200

----------------------------------------------------------------------
 .../runtime/operators/testutils/TestData.java   | 207 +++++++++++++++++++
 1 file changed, 207 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/db0b0087/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/TestData.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/TestData.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/TestData.java
index fd34a3b..8688d4e 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/TestData.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/TestData.java
@@ -21,6 +21,8 @@ package org.apache.flink.runtime.operators.testutils;
 import java.util.Comparator;
 import java.util.Random;
 
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.types.IntValue;
 import org.apache.flink.types.Record;
 import org.apache.flink.types.StringValue;
@@ -279,6 +281,169 @@ public final class TestData {
                        this.counter = 0;
                }
        }
+
+       /**
+        * Tuple2<Integer, String> generator.
+        */
+       public static class TupleGenerator implements 
MutableObjectIterator<Tuple2<Integer, String>> {
+
+               public enum KeyMode {
+                       SORTED, RANDOM, SORTED_SPARSE
+               };
+
+               public enum ValueMode {
+                       FIX_LENGTH, RANDOM_LENGTH, CONSTANT
+               };
+
+               private static char[] alpha = { 'a', 'b', 'c', 'd', 'e', 'f', 
'g', 'h', 'i', 'j', 'k', 'l', 'm', 'a', 'b', 'c',
+                               'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 
'm' };
+
+               private final long seed;
+
+               private final int keyMax;
+
+               private final float keyDensity;
+
+               private final int valueLength;
+
+               private final KeyMode keyMode;
+
+               private final ValueMode valueMode;
+
+               private Random random;
+
+               private int counter;
+
+               private int key;
+               private String value;
+
+               public TupleGenerator(long seed, int keyMax, int valueLength) {
+                       this(seed, keyMax, valueLength, KeyMode.RANDOM, 
ValueMode.FIX_LENGTH);
+               }
+
+               public TupleGenerator(long seed, int keyMax, int valueLength, 
KeyMode keyMode, ValueMode valueMode) {
+                       this(seed, keyMax, valueLength, keyMode, valueMode, 
null);
+               }
+
+               public TupleGenerator(long seed, int keyMax, int valueLength, 
KeyMode keyMode, ValueMode valueMode, String constant) {
+                       this(seed, keyMax, 1.0f, valueLength, keyMode, 
valueMode, constant);
+               }
+
+               public TupleGenerator(long seed, int keyMax, float keyDensity, 
int valueLength, KeyMode keyMode, ValueMode valueMode, String constant) {
+                       this.seed = seed;
+                       this.keyMax = keyMax;
+                       this.keyDensity = keyDensity;
+                       this.valueLength = valueLength;
+                       this.keyMode = keyMode;
+                       this.valueMode = valueMode;
+
+                       this.random = new Random(seed);
+                       this.counter = 0;
+
+                       this.value = constant == null ? null : constant;
+               }
+
+               public Tuple2<Integer, String> next(Tuple2<Integer, String> 
reuse) {
+                       this.key = nextKey();
+                       if (this.valueMode != ValueMode.CONSTANT) {
+                               this.value = randomString();
+                       }
+                       reuse.setFields(this.key, this.value);
+                       return reuse;
+               }
+
+               public Tuple2<Integer, String> next() {
+                       return next(new Tuple2<Integer, String>());
+               }
+
+               public boolean next(org.apache.flink.types.Value[] target) {
+                       this.key = nextKey();
+                       // TODO change this to something proper
+                       ((IntValue)target[0]).setValue(this.key);
+                       ((IntValue)target[1]).setValue(random.nextInt());
+                       return true;
+               }
+
+               private int nextKey() {
+                       if (keyMode == KeyMode.SORTED) {
+                               return ++counter;
+                       } else if (keyMode == KeyMode.SORTED_SPARSE) {
+                               int max = (int) (1 / keyDensity);
+                               counter += random.nextInt(max) + 1;
+                               return counter;
+                       } else {
+                               return Math.abs(random.nextInt() % keyMax) + 1;
+                       }
+               }
+
+               public void reset() {
+                       this.random = new Random(seed);
+                       this.counter = 0;
+               }
+
+               private String randomString() {
+                       int length;
+
+                       if (valueMode == ValueMode.FIX_LENGTH) {
+                               length = valueLength;
+                       } else {
+                               length = valueLength - 
random.nextInt(valueLength / 3);
+                       }
+
+                       StringBuilder sb = new StringBuilder();
+                       for (int i = 0; i < length; i++) {
+                               sb.append(alpha[random.nextInt(alpha.length)]);
+                       }
+                       return sb.toString();
+               }
+
+       }
+
+
+       /**
+        * Record reader mock.
+        */
+       public static class TupleGeneratorIterator implements 
MutableObjectIterator<Tuple2<Integer, String>> {
+
+               private final TupleGenerator generator;
+
+               private final int numberOfRecords;
+
+               private int counter;
+
+               public TupleGeneratorIterator(TupleGenerator generator, int 
numberOfRecords) {
+                       this.generator = generator;
+                       this.generator.reset();
+                       this.numberOfRecords = numberOfRecords;
+                       this.counter = 0;
+               }
+
+               @Override
+               public Tuple2<Integer, String> next(Tuple2<Integer, String> 
target) {
+                       if (counter < numberOfRecords) {
+                               counter++;
+                               return generator.next(target);
+                       }
+                       else {
+                               return null;
+                       }
+               }
+
+               @Override
+               public Tuple2<Integer, String> next() {
+                       if (counter < numberOfRecords) {
+                               counter++;
+                               return generator.next();
+                       }
+                       else {
+                               return null;
+                       }
+               }
+
+               public void reset() {
+                       this.counter = 0;
+               }
+       }
        
        // 
--------------------------------------------------------------------------------------------
        
@@ -325,4 +490,46 @@ public final class TestData {
                        this.pos = 0;
                }
        }
+
+       public static class TupleConstantValueIterator implements 
MutableObjectIterator<Tuple2<Integer, String>> {
+
+               private int key;
+               private String value;
+
+               private final String valueValue;
+
+
+               private final int numPairs;
+
+               private int pos;
+
+
+               public TupleConstantValueIterator(int keyValue, String 
valueValue, int numPairs) {
+                       this.key = keyValue;
+                       this.valueValue = valueValue;
+                       this.numPairs = numPairs;
+               }
+
+               @Override
+               public Tuple2<Integer, String> next(Tuple2<Integer, String> 
reuse) {
+                       if (pos < this.numPairs) {
+                               this.value = this.valueValue + ' ' + pos;
+                               reuse.setFields(this.key, this.value);
+                               pos++;
+                               return reuse;
+                       }
+                       else {
+                               return null;
+                       }
+               }
+
+               @Override
+               public Tuple2<Integer, String> next() {
+                       return next(new Tuple2<Integer, String>());
+               }
+
+               public void reset() {
+                       this.pos = 0;
+               }
+       }
 }

Reply via email to