Yes, sounds like it. We run into problems at exactly same spot using BEAM
as well, although in that case it resulted in data loss.

Thank you Matthias. Doesn't sound like it's going to be resolved any time
soon, does it?

/Artur

On Mon, Dec 18, 2017 at 8:11 PM, Matthias J. Sax <matth...@confluent.io>
wrote:

> I think you are hitting: https://issues.apache.org/jira/browse/KAFKA-4609
>
>
> -Matthias
>
> On 12/18/17 1:52 AM, Artur Mrozowski wrote:
> > Hi Bill,
> >
> >  I am actually referring to duplicates as completely identical records. I
> > can observe it when I convert result of left join between KTables to
> > stream. The resulting stream will often contain identical messages.
> > For example we have
> >
> > KTable<String,ArrayList> left
> > {"claimcounter": 0, "claimreporttime": 55948.33110985625, "claimnumber":
> > "3_0", "claimtime": 708.521153490306}
> >
> > and KTable <String,ArrayList> right
> >
> > {"claimcounter": 0, "paytime": 55960.226718985265, "claimnumber": "3_0",
> > "payment": 847015.1437781961}
> >
> > When I leftjoin theses two objects the result in the state store will be
> an
> > object containing two  ArrayLists left and right, like this
> >
> > {"claimList":{"lst":[{"claimnumber":"3_0","claimtime"
> :"708.521153490306","claimreporttime":"55948.33110985625","claimcounter":"
> 0"}]},"paymentList":{"lst":[{"payment":847015.1437781961,"
> paytime":55960.226718985265,"claimcounter":0,"claimnumber":"3_0"}]}}
> >
> > But I want to continue processing the results by using groupBy and
> > aggregate so I convert reuslt of the leftjoin to stream. Now the
> resulting
> > reparation and changelog topics,most of the time, will contain two
> > identical messages, like this
> >
> > {"claimList":{"lst":[{"claimnumber":"3_0","claimtime"
> :"708.521153490306","claimreporttime":"55948.33110985625","claimcounter":"
> 0"}]},"paymentList":{"lst":[{"payment":847015.1437781961,"
> paytime":55960.226718985265,"claimcounter":0,"claimnumber":"3_0"}]}}
> > {"claimList":{"lst":[{"claimnumber":"3_0","claimtime"
> :"708.521153490306","claimreporttime":"55948.33110985625","claimcounter":"
> 0"}]},"paymentList":{"lst":[{"payment":847015.1437781961,"
> paytime":55960.226718985265,"claimcounter":0,"claimnumber":"3_0"}]}}
> >
> >
> > and hence passing duplicates to the next operation.
> >
> > My question is what is the best practice to avoid that?
> >
> > https://github.com/afuyo/KStreamsDemo/blob/master/src/
> main/java/kstream.demo/CustomerStreamPipelineHDI.java#L423
> >
> > Best regards
> > Artur
> >
> >
> > On Wed, Dec 13, 2017 at 3:42 PM, Bill Bejeck <b...@confluent.io> wrote:
> >
> >> Hi Artur,
> >>
> >> The most direct way for deduplication (I'm using the term deduplication
> to
> >> mean records with the same key, but not necessarily the same value,
> where
> >> later records are considered) is to set the  CACHE_MAX_BYTES_BUFFERING_
> >> CONFIG
> >> setting to a value greater than zero.
> >>
> >> Your other option is to use the PAPI and by writing your own logic in
> >> conjunction with a state store determine what constitutes a duplicate
> and
> >> when to emit a record.  You could take the same approach in the DSL
> layer
> >> using a Transformer.
> >>
> >> HTH.
> >>
> >> Bill
> >>
> >> On Wed, Dec 13, 2017 at 7:00 AM, Artur Mrozowski <art...@gmail.com>
> wrote:
> >>
> >>> Hi
> >>> I run an app where I transform KTable to stream and then I groupBy and
> >>> aggregate and capture the results in KTable again. That generates many
> >>> duplicates.
> >>>
> >>> I have played with exactly once semantics that seems to reduce
> duplicates
> >>> for records that should be unique. But I still get duplicates on keys
> >> that
> >>> have two or more records.
> >>>
> >>> I could not reproduce it on small number of records so I disable
> caching
> >> by
> >>> setting CACHE_MAX_BYTES_BUFFERING_CONFIG to 0. Surely enough, I got
> >> loads
> >>> of duplicates, even these previously eliminated by exactly once
> >> semantics.
> >>> Now I have hard time to enable it again on Confluent 3.3.
> >>>
> >>> But, generally what it the best deduplication strategy for Kafka
> Streams?
> >>>
> >>> Artur
> >>>
> >>
> >
>
>

Reply via email to