Hi Bill,

 I am actually referring to duplicates as completely identical records. I
can observe it when I convert result of left join between KTables to
stream. The resulting stream will often contain identical messages.
For example we have

KTable<String,ArrayList> left
{"claimcounter": 0, "claimreporttime": 55948.33110985625, "claimnumber":
"3_0", "claimtime": 708.521153490306}

and KTable <String,ArrayList> right

{"claimcounter": 0, "paytime": 55960.226718985265, "claimnumber": "3_0",
"payment": 847015.1437781961}

When I leftjoin theses two objects the result in the state store will be an
object containing two  ArrayLists left and right, like this

{"claimList":{"lst":[{"claimnumber":"3_0","claimtime":"708.521153490306","claimreporttime":"55948.33110985625","claimcounter":"0"}]},"paymentList":{"lst":[{"payment":847015.1437781961,"paytime":55960.226718985265,"claimcounter":0,"claimnumber":"3_0"}]}}

But I want to continue processing the results by using groupBy and
aggregate so I convert reuslt of the leftjoin to stream. Now the resulting
reparation and changelog topics,most of the time, will contain two
identical messages, like this

{"claimList":{"lst":[{"claimnumber":"3_0","claimtime":"708.521153490306","claimreporttime":"55948.33110985625","claimcounter":"0"}]},"paymentList":{"lst":[{"payment":847015.1437781961,"paytime":55960.226718985265,"claimcounter":0,"claimnumber":"3_0"}]}}
{"claimList":{"lst":[{"claimnumber":"3_0","claimtime":"708.521153490306","claimreporttime":"55948.33110985625","claimcounter":"0"}]},"paymentList":{"lst":[{"payment":847015.1437781961,"paytime":55960.226718985265,"claimcounter":0,"claimnumber":"3_0"}]}}


and hence passing duplicates to the next operation.

My question is what is the best practice to avoid that?

https://github.com/afuyo/KStreamsDemo/blob/master/src/main/java/kstream.demo/CustomerStreamPipelineHDI.java#L423

Best regards
Artur


On Wed, Dec 13, 2017 at 3:42 PM, Bill Bejeck <b...@confluent.io> wrote:

> Hi Artur,
>
> The most direct way for deduplication (I'm using the term deduplication to
> mean records with the same key, but not necessarily the same value, where
> later records are considered) is to set the  CACHE_MAX_BYTES_BUFFERING_
> CONFIG
> setting to a value greater than zero.
>
> Your other option is to use the PAPI and by writing your own logic in
> conjunction with a state store determine what constitutes a duplicate and
> when to emit a record.  You could take the same approach in the DSL layer
> using a Transformer.
>
> HTH.
>
> Bill
>
> On Wed, Dec 13, 2017 at 7:00 AM, Artur Mrozowski <art...@gmail.com> wrote:
>
> > Hi
> > I run an app where I transform KTable to stream and then I groupBy and
> > aggregate and capture the results in KTable again. That generates many
> > duplicates.
> >
> > I have played with exactly once semantics that seems to reduce duplicates
> > for records that should be unique. But I still get duplicates on keys
> that
> > have two or more records.
> >
> > I could not reproduce it on small number of records so I disable caching
> by
> > setting CACHE_MAX_BYTES_BUFFERING_CONFIG to 0. Surely enough, I got
> loads
> > of duplicates, even these previously eliminated by exactly once
> semantics.
> > Now I have hard time to enable it again on Confluent 3.3.
> >
> > But, generally what it the best deduplication strategy for Kafka Streams?
> >
> > Artur
> >
>

Reply via email to