I think I figured out the explanation of the first part. Looks like the stream gets distributed and merged between the source and the map operator because their parallelisms are different, and therefore the messages resulting from the map operator become out of order. The "Timestamp monotony violated" warnings disappeared when I set the source and the map operator to the same parallelism. I found about operator chaining and I tried to chain the source and map operators (as in here : https://ci.apache.org/projects/flink/flink-docs- release-1.0/concepts/concepts.html#tasks--operator-chains) in order to have the same parallelism, but I didn't succeed. Isn't doing env.addSource(). setparallelism(n).startNewChain().map(...)disableChaining() equivalent to setting source and map parallelism to the same value?
On Mon, Aug 15, 2016 at 12:38 PM, Yassine Marzougui <[email protected]> wrote: > Hi all, > > I have a Kafka topic with two partitions, messages within each partition > are ordered in ascending timestamps. > > The following code works correctly (I'm running this on my local machine, > the default parallelism is the number of cores=8): > > stream = env.addSource(myFlinkKafkaConsumer09) > stream.map(mapper) > .assignTimestampsAndWatermarks(ascendingTimestampExtractor) > .keyby(0) > .timeWindow(Time.minutes(10)) > .reduce(reducer) > .print() > > But if I explicitly set > env.addSource(myFlinkKafkaConsumer09).setparallelism(n), > where n > (number of partitions = 2) and n !=8, I get a bunch of "Timestamp > monotony violated" warnings. My understanding is that only 2 sources will > be mapped to the topic partitions and since messages are ordered within > each partition, timestamps assignment should happen correctly regardless of > the parallelsim as long as it is >= 2. > *Question 1 *: What is the explanation of this? > > > Now I add an other empty partition to the topic. The job doesn't give any > output anymore and that's expected since it keeps waiting forever for the > empty partition's watermark. What I don't understand though, is a > strange behavior when set the parallelism explicitly at the source : > *Question 2 *: Why am I able to get an output if I explicitly set > env.addSource(myFlinkKafkaConsumer09).setparallelism(n), shouldn't the > empty partition argument apply here too? And why is that output seen only > when n != 8 ? > > Best, > Yassine >
