Good to hear that :)

Duplicated “uncommitted” messages are normal and to be expected. After all 
that’s what `read_uncommitted` is for - to be able to read the messages without 
waiting until they are committed and thus even if their transactions was later 
aborted.

Piotrek

> On 1 Dec 2018, at 14:44, Nastaran Motavali <n.motav...@son.ir> wrote:
> 
> Thanks for your helpful response,
> Setting the consumer's 'isolation.level' property to 'read_committed' solved 
> the problem!
> In fact, still there is some duplicated messages in the sink topic but they 
> are uncommitted and if a kafka consumer reads the messages from this sink, 
> the duplicated messages have not been read so everything is OK.
> 
> 
> 
> Kind regards,
> Nastaran Motavalli
> 
> 
> From: Piotr Nowojski <pi...@data-artisans.com>
> Sent: Thursday, November 29, 2018 3:38:38 PM
> To: Nastaran Motavali
> Cc: user@flink.apache.org
> Subject: Re: Dulicated messages in kafka sink topic using flink 
> cancel-with-savepoint operation
>  
> Hi Nastaran,
> 
> When you are checking for duplicated messages, are you reading from kafka 
> using `read_commited` mode (this is not the default value)?
> 
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html#kafka-producer-partitioning-scheme
>  
> <https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html#kafka-producer-partitioning-scheme>
> 
> > Semantic.EXACTLY_ONCE: uses Kafka transactions to provide exactly-once 
> > semantic. Whenever you write to Kafka using
> > transactions, do not forget about setting desired isolation.level 
> > (read_committed or read_uncommitted - the latter one is the
> > default value) for any application consuming records from Kafka.
> 
> Does the problem happens always?
> 
> Piotrek
> 
>> On 28 Nov 2018, at 08:56, Nastaran Motavali <n.motav...@son.ir 
>> <mailto:n.motav...@son.ir>> wrote:
>> 
>> Hi,
>> I have a flink streaming job implemented via java which reads some messages 
>> from a kafka topic, transforms them and finally sends them to another kafka 
>> topic.
>> The version of flink is 1.6.2 and the kafka version is 011. I pass the 
>> Semantic.EXACTLY_ONCE parameter to the producer. The problem is that when I 
>> cancel the job with savepoint and then restart it using the saved savepoint, 
>> I have duplicated messages in the sink.
>> Do I miss some kafka/flink configurations to avoid duplication?
>> 
>> 
>> Kind regards,
>> Nastaran Motavalli

Reply via email to