Hi Yassine, In Flink 1.2 we've added a new feature to the Kafka consumer, allowing you to extract timestamps and emitting watermarks per partition. The consumers now have the following method:
public FlinkKafkaConsumerBase<T> assignTimestampsAndWatermarks(AssignerWithPunctuatedWatermarks<T> assigner) Using a timestamp extractor directly attached to the consumer, you don't need to worry about the parallelism of subsequent operators. On Mon, Aug 15, 2016 at 4:56 PM, Yassine Marzougui <yassmar...@gmail.com> wrote: > I think I also figured out the reason of the behavior I described when one > Kafka partition is empty. > According to the JavaDocs > <https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/datastream/DataStream.html#DataStream-org.apache.flink.streaming.api.environment.StreamExecutionEnvironment-org.apache.flink.streaming.api.transformations.StreamTransformation->, > the datastream partitioning is set to *forward* by default, i.e. each map > sub-task will receive data from exactly one source sub-task. For one of the > stream partitions (corresponding to the empty Kafka partition) resulting > from the map operator, the watermark does not advance, which makes the > window operator wait forever. > Now if the map and source operators have a different parallelism, Flink > uses rebalance partitioning to redistribute the stream as pointed out in this > mailing list thread > <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Forward-Partitioning-same-Parallelism-1-1-communication-tp2373p2382.html>, > therefore the watermark advances for all the stream partitions output from > the map operator. > Some of the details regarding the partitioning were mentioned in the 0.9 > docs > <https://ci.apache.org/projects/flink/flink-docs-release-0.9/apis/streaming_guide.html#partitioning>, > but unfortunately they aren't in the 1.x docs > <https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/index.html#physical-partitioning> > . > > On Mon, Aug 15, 2016 at 12:38 PM, Yassine Marzougui <yassmar...@gmail.com> > wrote: > >> Hi all, >> >> I have a Kafka topic with two partitions, messages within each partition >> are ordered in ascending timestamps. >> >> The following code works correctly (I'm running this on my local machine, >> the default parallelism is the number of cores=8): >> >> stream = env.addSource(myFlinkKafkaConsumer09) >> stream.map(mapper) >> .assignTimestampsAndWatermarks(ascendingTimestampExtractor) >> .keyby(0) >> .timeWindow(Time.minutes(10)) >> .reduce(reducer) >> .print() >> >> But if I explicitly set >> env.addSource(myFlinkKafkaConsumer09).setparallelism(n), >> where n > (number of partitions = 2) and n !=8, I get a bunch of "Timestamp >> monotony violated" warnings. My understanding is that only 2 sources will >> be mapped to the topic partitions and since messages are ordered within >> each partition, timestamps assignment should happen correctly regardless of >> the parallelsim as long as it is >= 2. >> *Question 1 *: What is the explanation of this? >> >> >> Now I add an other empty partition to the topic. The job doesn't give any >> output anymore and that's expected since it keeps waiting forever for the >> empty partition's watermark. What I don't understand though, is a >> strange behavior when set the parallelism explicitly at the source : >> *Question 2 *: Why am I able to get an output if I explicitly set >> env.addSource(myFlinkKafkaConsumer09).setparallelism(n), shouldn't the >> empty partition argument apply here too? And why is that output seen only >> when n != 8 ? >> >> Best, >> Yassine >> > >