Thanks, Fabian.
In this case, I could just extend your idea by creating some deterministic
multiplier of the subtask index:

      RichMapFunction<String, Tuple2&lt;Integer,String>> keyByMap = new
RichMapFunction<String, Tuple2&lt;Integer,String>>() {
              public Tuple2<Integer,String> map(String value) {
                int indexOfCounter = Math.abs(value.hashCode()) % 4;
                int key = (( getRuntimeContext().getIndexOfThisSubtask() +
1)  * (indexOfCounter + 1)) - 1;
                counters.get(key).add(1);
                return new Tuple2<>(key, value);
            }
        };

With this idea, if there are 12 subtasks, then subtask 0 would create 4
keys: 0, 12, 24, and 36.

The big advantage of your idea was that it would keep the data local. Is
this still true with my example here (where I'm applying a function to the
subtask index)? That is, if a each partition is generating a unique set of
keys (unique to that subtask), will it optimize to keep that set of keys
local for the next downstream subtask?



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Strange-behavior-of-DataStream-countWindow-tp7482p13978.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.

Reply via email to