Hi Yassine,

In Flink 1.2 we've added a new feature to the Kafka consumer, allowing you
to extract timestamps and emitting watermarks per partition.
The consumers now have the following method:

public FlinkKafkaConsumerBase<T>
assignTimestampsAndWatermarks(AssignerWithPunctuatedWatermarks<T>
assigner)

Using a timestamp extractor directly attached to the consumer, you don't
need to worry about the parallelism of subsequent operators.


On Mon, Aug 15, 2016 at 4:56 PM, Yassine Marzougui <yassmar...@gmail.com>
wrote:

> I think I also figured out the reason of the behavior I described when one
> Kafka partition is empty.
> According to the JavaDocs
> <https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/datastream/DataStream.html#DataStream-org.apache.flink.streaming.api.environment.StreamExecutionEnvironment-org.apache.flink.streaming.api.transformations.StreamTransformation->,
> the datastream partitioning is set to *forward* by default, i.e. each map
> sub-task will receive data from exactly one source sub-task. For one of the
> stream partitions (corresponding to the empty Kafka partition) resulting
> from the map operator, the watermark does not advance, which makes the
> window operator wait forever.
> Now if the map and source operators have a different parallelism, Flink
> uses rebalance partitioning to redistribute the stream as pointed out in this
> mailing list thread
> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Forward-Partitioning-same-Parallelism-1-1-communication-tp2373p2382.html>,
> therefore the watermark advances for all the stream partitions output from
> the map operator.
> Some of the details regarding the partitioning were mentioned in the 0.9
> docs
> <https://ci.apache.org/projects/flink/flink-docs-release-0.9/apis/streaming_guide.html#partitioning>,
> but unfortunately they aren't in the  1.x docs
> <https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/index.html#physical-partitioning>
> .
>
> On Mon, Aug 15, 2016 at 12:38 PM, Yassine Marzougui <yassmar...@gmail.com>
> wrote:
>
>> Hi all,
>>
>> I have a Kafka topic with two partitions, messages within each partition
>> are ordered in ascending timestamps.
>>
>> The following code works correctly (I'm running this on my local machine,
>> the default parallelism is the number of cores=8):
>>
>> stream = env.addSource(myFlinkKafkaConsumer09)
>> stream.map(mapper)
>>   .assignTimestampsAndWatermarks(ascendingTimestampExtractor)
>>   .keyby(0)
>>   .timeWindow(Time.minutes(10))
>>   .reduce(reducer)
>>   .print()
>>
>> But if I explicitly set 
>> env.addSource(myFlinkKafkaConsumer09).setparallelism(n),
>> where n > (number of partitions = 2) and n !=8, I get a bunch of "Timestamp
>> monotony violated" warnings. My understanding is that only 2 sources will
>> be mapped to the topic partitions and since messages are ordered within
>> each partition, timestamps assignment should happen correctly regardless of
>> the parallelsim as long as it is >= 2.
>> *Question 1 *: What is the explanation of this?
>>
>>
>> Now I add an other empty partition to the topic. The job doesn't give any
>> output anymore and that's expected since it keeps waiting forever for the
>> empty partition's watermark. What I don't understand though, is a
>> strange behavior when set the parallelism explicitly at the source :
>> *Question 2 *: Why am I able to get an output if I explicitly set
>> env.addSource(myFlinkKafkaConsumer09).setparallelism(n), shouldn't the
>> empty partition argument apply here too? And why is that output seen only
>> when n != 8 ?
>>
>> Best,
>> Yassine
>>
>
>

Reply via email to