Thanks!

Tim Ward

-----Original Message-----
From: Bruno Cadonna <br...@confluent.io>
Sent: 12 August 2019 14:18
To: users@kafka.apache.org
Subject: Re: KSTREAM-AGGREGATE-STATE-STORE persistence?

Hi Tim,

Kafka Streams guarantees at-least-once processing semantics by
default. That means, a record is processed (e.g. added to an
aggregate) at least once but might be processed multiple times. The
cause for processing the same record multiple time are crashes as you
described. Exactly-once processing guarantees need to be explicitly
turned on in Kafka Streams.

See the following links for more information:
https://kafka.apache.org/23/documentation/streams/core-concepts#streams_processing_guarantee
https://www.confluent.io/blog/exactly-once-semantics-are-possible-heres-how-apache-kafka-does-it/
https://www.confluent.io/blog/enabling-exactly-once-kafka-streams/

Best,
Bruno

On Mon, Aug 12, 2019 at 2:38 PM Tim Ward
<tim.w...@origamienergy.com.invalid> wrote:
>
> I believe I have witnessed - at least twice - something like the following 
> happening, in a Kafka Streams application where I have a 
> .groupByKey().windowedBy().aggregate() sequence.
>
>
>   *   Application runs for a while
>   *   Application crashes
>   *   Application restarts
>   *   Aggregator.apply() is called to aggregate an input message *that has 
> already been included in the aggregate*
>
> It looks like when the application crashes, the KSTREAM-AGGREGATE-STATE-STORE 
> has been persisted after message X has been aggregated but message X has not 
> been committed back to the original source topic.
>
> So on restart message X gets read and processed again, and gets aggregated a 
> second time into the same aggregate.
>
> Now that I know this is happening (it was spotted by what I thought was some 
> over-the-top paranoid validation code) I can cope with it, and it is possible 
> to make the aggregation operation idempotent, because of the structure of the 
> particular operation I'm doing ... but what if the aggregation had been 
> something like a simple counting or totalling operation? How would anyone 
> know the original input message(s) had been aggregated more than once?
>
> So, my question:
>
> Am I correct in diagnosing that persisting the state store and committing the 
> original source message are not carried out atomically, and one has to expect 
> the same message can be applied to the same aggregate multiple times, and if 
> one cares about this one has to detect it happening and make the aggregation 
> process idempotent? I don't see this explained in the JavaDoc for either 
> Aggregator (or Reducer, where presumably it also applies).
>
> Tim Ward
>
> This email is from Origami Energy Limited. The contents of this email and any 
> attachment are confidential to the intended recipient(s). If you are not an 
> intended recipient: (i) do not use, disclose, distribute, copy or publish 
> this email or its contents; (ii) please contact Origami Energy Limited 
> immediately; and then (iii) delete this email. For more information, our 
> privacy policy is available here: https://origamienergy.com/privacy-policy/. 
> Origami Energy Limited (company number 8619644) is a company registered in 
> England with its registered office at Ashcombe Court, Woolsack Way, 
> Godalming, GU7 1LQ.
This email is from Origami Energy Limited. The contents of this email and any 
attachment are confidential to the intended recipient(s). If you are not an 
intended recipient: (i) do not use, disclose, distribute, copy or publish this 
email or its contents; (ii) please contact Origami Energy Limited immediately; 
and then (iii) delete this email. For more information, our privacy policy is 
available here: https://origamienergy.com/privacy-policy/. Origami Energy 
Limited (company number 8619644) is a company registered in England with its 
registered office at Ashcombe Court, Woolsack Way, Godalming, GU7 1LQ.

Reply via email to