I’m playing with the (Window)WordCount example from Flink QuickStart. I
generate a DataStream consisting of 1000 Strings of random digits, which is
windowed with a tumbling count window of 50 elements:

import org.apache.flink.api.common.functions.FlatMapFunction;import
org.apache.flink.api.java.functions.KeySelector;import
org.apache.flink.api.java.tuple.Tuple2;import
org.apache.flink.streaming.api.datastream.DataStream;import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import
org.apache.flink.util.Collector;
import java.util.Random;
public class DigitCount {


    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();

        DataStream<String> text = env.fromElements(
                "14159265358979323846264338327950288419716939937510",
                "58209749445923078164062862089986280348253421170679",
                "82148086513282306647093844609550582231725359408128",
                "48111745028410270193852110555964462294895493038196",
                "44288109756659334461284756482337867831652712019091",
                "45648566923460348610454326648213393607260249141273",
                "72458700660631558817488152092096282925409171536436",
                "78925903600113305305488204665213841469519415116094",
                "33057270365759591953092186117381932611793105118548",
                "07446237996274956735188575272489122793818301194912",
                "98336733624406566430860213949463952247371907021798",
                "60943702770539217176293176752384674818467669405132",
                "00056812714526356082778577134275778960917363717872",
                "14684409012249534301465495853710507922796892589235",
                "42019956112129021960864034418159813629774771309960",
                "51870721134999999837297804995105973173281609631859",
                "50244594553469083026425223082533446850352619311881",
                "71010003137838752886587533208381420617177669147303",
                "59825349042875546873115956286388235378759375195778",
                "18577805321712268066130019278766111959092164201989"
        );

        DataStream<Tuple2<Integer, Integer>> digitCount = text
                .flatMap(new Splitter())
                .keyBy(new KeySelector<Tuple2<Integer, Integer>, Integer>() {
                    @Override
                    public Integer getKey(Tuple2<Integer, Integer> x)
throws Exception {
                        return x.f0 % 2;
                    }
                })
                .countWindow(50)
                .sum(1);

        digitCount.print();
        env.execute();

    }

    public static final class Splitter implements
FlatMapFunction<String, Tuple2<Integer, Integer>> {
        @Override
        public void flatMap(String value, Collector<Tuple2<Integer,
Integer>> out) {
            for (String token : value.split("")) {
                if (token.length() == 0) {
                    continue;
                }
                out.collect(Tuple2.of(Integer.parseInt(token), 1));
            }
        }
    }
}

The code above will produce 19 lines of output which is reasonable as the
1000 digits will be keyed into 2 partitions where one partition contains
500+ elements and the other contains slightly fewer than 500 elements,
therefore as a result one 50-digit window is ignored.

So far so good, but if I replace the mod KeySelector with a random one:

private static class RandomKeySelector<T> implements KeySelector<T, Integer> {
    private int nPartitions;
    private Random random;

    RandomKeySelector(int nPartitions) {
        this.nPartitions = nPartitions;
        random = new Random();
    }

    @Override
    public Integer getKey(T dummy) throws Exception {
        return random.nextInt(this.nPartitions);
    }
}

and then

.keyBy(new RandomKeySelector<Tuple2<Integer, Integer>>(2))

it may generate 17 or 18 lines of output. How could that happen? Moreover,
if I set the number of partitions to 10, in theory the lines of output
should be no fewer than 11, but actually it can be only 9.

Please help me understand why countWindow behaves like this.

Reply via email to