Hi all, I have a Kafka topic with two partitions, messages within each partition are ordered in ascending timestamps.
The following code works correctly (I'm running this on my local machine, the default parallelism is the number of cores=8): stream = env.addSource(myFlinkKafkaConsumer09) stream.map(mapper) .assignTimestampsAndWatermarks(ascendingTimestampExtractor) .keyby(0) .timeWindow(Time.minutes(10)) .reduce(reducer) .print() But if I explicitly set env.addSource(myFlinkKafkaConsumer09).setparallelism(n), where n > (number of partitions = 2) and n !=8, I get a bunch of "Timestamp monotony violated" warnings. My understanding is that only 2 sources will be mapped to the topic partitions and since messages are ordered within each partition, timestamps assignment should happen correctly regardless of the parallelsim as long as it is >= 2. *Question 1 *: What is the explanation of this? Now I add an other empty partition to the topic. The job doesn't give any output anymore and that's expected since it keeps waiting forever for the empty partition's watermark. What I don't understand though, is a strange behavior when set the parallelism explicitly at the source : *Question 2 *: Why am I able to get an output if I explicitly set env.addSource(myFlinkKafkaConsumer09).setparallelism(n), shouldn't the empty partition argument apply here too? And why is that output seen only when n != 8 ? Best, Yassine