Good to hear that :) Duplicated “uncommitted” messages are normal and to be expected. After all that’s what `read_uncommitted` is for - to be able to read the messages without waiting until they are committed and thus even if their transactions was later aborted.
Piotrek > On 1 Dec 2018, at 14:44, Nastaran Motavali <n.motav...@son.ir> wrote: > > Thanks for your helpful response, > Setting the consumer's 'isolation.level' property to 'read_committed' solved > the problem! > In fact, still there is some duplicated messages in the sink topic but they > are uncommitted and if a kafka consumer reads the messages from this sink, > the duplicated messages have not been read so everything is OK. > > > > Kind regards, > Nastaran Motavalli > > > From: Piotr Nowojski <pi...@data-artisans.com> > Sent: Thursday, November 29, 2018 3:38:38 PM > To: Nastaran Motavali > Cc: user@flink.apache.org > Subject: Re: Dulicated messages in kafka sink topic using flink > cancel-with-savepoint operation > > Hi Nastaran, > > When you are checking for duplicated messages, are you reading from kafka > using `read_commited` mode (this is not the default value)? > > https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html#kafka-producer-partitioning-scheme > > <https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html#kafka-producer-partitioning-scheme> > > > Semantic.EXACTLY_ONCE: uses Kafka transactions to provide exactly-once > > semantic. Whenever you write to Kafka using > > transactions, do not forget about setting desired isolation.level > > (read_committed or read_uncommitted - the latter one is the > > default value) for any application consuming records from Kafka. > > Does the problem happens always? > > Piotrek > >> On 28 Nov 2018, at 08:56, Nastaran Motavali <n.motav...@son.ir >> <mailto:n.motav...@son.ir>> wrote: >> >> Hi, >> I have a flink streaming job implemented via java which reads some messages >> from a kafka topic, transforms them and finally sends them to another kafka >> topic. >> The version of flink is 1.6.2 and the kafka version is 011. I pass the >> Semantic.EXACTLY_ONCE parameter to the producer. The problem is that when I >> cancel the job with savepoint and then restart it using the saved savepoint, >> I have duplicated messages in the sink. >> Do I miss some kafka/flink configurations to avoid duplication? >> >> >> Kind regards, >> Nastaran Motavalli