Hi Fabian - I've tried this idea of creating a KeyedStream based on getRuntimeContext().getIndexOfThisSubtask(). However, not all target subtasks are receiving records.
All subtasks have a parallelism of 12, so I have 12 source subtasks and 12 target subtasks. I've confirmed that the call to getIndexOfThisSubtask is evenly distributed between 0 and 11. However, 4 out of the 12 target subtasks (the subtasks after the hash) are no receiving any data. This means it's not actually keeping all the data local, because at least 4 of the 12 partitions could be getting sent to different TaskManagers. Do I need to do a .partitionCustom to ensure even/local distribution? Thanks, Edward -- View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Strange-behavior-of-DataStream-countWindow-tp7482p13971.html Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.