I’m playing with the (Window)WordCount example from Flink QuickStart. I generate a DataStream consisting of 1000 Strings of random digits, which is windowed with a tumbling count window of 50 elements:
import org.apache.flink.api.common.functions.FlatMapFunction;import org.apache.flink.api.java.functions.KeySelector;import org.apache.flink.api.java.tuple.Tuple2;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.util.Collector; import java.util.Random; public class DigitCount { public static void main(String[] args) throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStream<String> text = env.fromElements( "14159265358979323846264338327950288419716939937510", "58209749445923078164062862089986280348253421170679", "82148086513282306647093844609550582231725359408128", "48111745028410270193852110555964462294895493038196", "44288109756659334461284756482337867831652712019091", "45648566923460348610454326648213393607260249141273", "72458700660631558817488152092096282925409171536436", "78925903600113305305488204665213841469519415116094", "33057270365759591953092186117381932611793105118548", "07446237996274956735188575272489122793818301194912", "98336733624406566430860213949463952247371907021798", "60943702770539217176293176752384674818467669405132", "00056812714526356082778577134275778960917363717872", "14684409012249534301465495853710507922796892589235", "42019956112129021960864034418159813629774771309960", "51870721134999999837297804995105973173281609631859", "50244594553469083026425223082533446850352619311881", "71010003137838752886587533208381420617177669147303", "59825349042875546873115956286388235378759375195778", "18577805321712268066130019278766111959092164201989" ); DataStream<Tuple2<Integer, Integer>> digitCount = text .flatMap(new Splitter()) .keyBy(new KeySelector<Tuple2<Integer, Integer>, Integer>() { @Override public Integer getKey(Tuple2<Integer, Integer> x) throws Exception { return x.f0 % 2; } }) .countWindow(50) .sum(1); digitCount.print(); env.execute(); } public static final class Splitter implements FlatMapFunction<String, Tuple2<Integer, Integer>> { @Override public void flatMap(String value, Collector<Tuple2<Integer, Integer>> out) { for (String token : value.split("")) { if (token.length() == 0) { continue; } out.collect(Tuple2.of(Integer.parseInt(token), 1)); } } } } The code above will produce 19 lines of output which is reasonable as the 1000 digits will be keyed into 2 partitions where one partition contains 500+ elements and the other contains slightly fewer than 500 elements, therefore as a result one 50-digit window is ignored. So far so good, but if I replace the mod KeySelector with a random one: private static class RandomKeySelector<T> implements KeySelector<T, Integer> { private int nPartitions; private Random random; RandomKeySelector(int nPartitions) { this.nPartitions = nPartitions; random = new Random(); } @Override public Integer getKey(T dummy) throws Exception { return random.nextInt(this.nPartitions); } } and then .keyBy(new RandomKeySelector<Tuple2<Integer, Integer>>(2)) it may generate 17 or 18 lines of output. How could that happen? Moreover, if I set the number of partitions to 10, in theory the lines of output should be no fewer than 11, but actually it can be only 9. Please help me understand why countWindow behaves like this.