Thanks, Fabian.
In this case, I could just extend your idea by creating some deterministic
multiplier of the subtask index:
RichMapFunction<String, Tuple2<Integer,String>> keyByMap = new
RichMapFunction<String, Tuple2<Integer,String>>() {
public Tuple2<Integer,String> map(String value) {
int indexOfCounter = Math.abs(value.hashCode()) % 4;
int key = (( getRuntimeContext().getIndexOfThisSubtask() +
1) * (indexOfCounter + 1)) - 1;
counters.get(key).add(1);
return new Tuple2<>(key, value);
}
};
With this idea, if there are 12 subtasks, then subtask 0 would create 4
keys: 0, 12, 24, and 36.
The big advantage of your idea was that it would keep the data local. Is
this still true with my example here (where I'm applying a function to the
subtask index)? That is, if a each partition is generating a unique set of
keys (unique to that subtask), will it optimize to keep that set of keys
local for the next downstream subtask?
--
View this message in context:
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Strange-behavior-of-DataStream-countWindow-tp7482p13978.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at
Nabble.com.