Hi all,

I have a Kafka topic with two partitions, messages within each partition
are ordered in ascending timestamps.

The following code works correctly (I'm running this on my local machine,
the default parallelism is the number of cores=8):

stream = env.addSource(myFlinkKafkaConsumer09)
stream.map(mapper)
  .assignTimestampsAndWatermarks(ascendingTimestampExtractor)
  .keyby(0)
  .timeWindow(Time.minutes(10))
  .reduce(reducer)
  .print()

But if I explicitly set
env.addSource(myFlinkKafkaConsumer09).setparallelism(n),
where n > (number of partitions = 2) and n !=8, I get a bunch of "Timestamp
monotony violated" warnings. My understanding is that only 2 sources will
be mapped to the topic partitions and since messages are ordered within
each partition, timestamps assignment should happen correctly regardless of
the parallelsim as long as it is >= 2.
*Question 1 *: What is the explanation of this?


Now I add an other empty partition to the topic. The job doesn't give any
output anymore and that's expected since it keeps waiting forever for the
empty partition's watermark. What I don't understand though, is a
strange behavior when set the parallelism explicitly at the source :
*Question 2 *: Why am I able to get an output if I explicitly set
env.addSource(myFlinkKafkaConsumer09).setparallelism(n),
shouldn't the empty partition argument apply here too? And why is that
output seen only when n != 8 ?

Best,
Yassine

Reply via email to