I think you are hitting: https://issues.apache.org/jira/browse/KAFKA-4609
-Matthias On 12/18/17 1:52 AM, Artur Mrozowski wrote: > Hi Bill, > > I am actually referring to duplicates as completely identical records. I > can observe it when I convert result of left join between KTables to > stream. The resulting stream will often contain identical messages. > For example we have > > KTable<String,ArrayList> left > {"claimcounter": 0, "claimreporttime": 55948.33110985625, "claimnumber": > "3_0", "claimtime": 708.521153490306} > > and KTable <String,ArrayList> right > > {"claimcounter": 0, "paytime": 55960.226718985265, "claimnumber": "3_0", > "payment": 847015.1437781961} > > When I leftjoin theses two objects the result in the state store will be an > object containing two ArrayLists left and right, like this > > {"claimList":{"lst":[{"claimnumber":"3_0","claimtime":"708.521153490306","claimreporttime":"55948.33110985625","claimcounter":"0"}]},"paymentList":{"lst":[{"payment":847015.1437781961,"paytime":55960.226718985265,"claimcounter":0,"claimnumber":"3_0"}]}} > > But I want to continue processing the results by using groupBy and > aggregate so I convert reuslt of the leftjoin to stream. Now the resulting > reparation and changelog topics,most of the time, will contain two > identical messages, like this > > {"claimList":{"lst":[{"claimnumber":"3_0","claimtime":"708.521153490306","claimreporttime":"55948.33110985625","claimcounter":"0"}]},"paymentList":{"lst":[{"payment":847015.1437781961,"paytime":55960.226718985265,"claimcounter":0,"claimnumber":"3_0"}]}} > {"claimList":{"lst":[{"claimnumber":"3_0","claimtime":"708.521153490306","claimreporttime":"55948.33110985625","claimcounter":"0"}]},"paymentList":{"lst":[{"payment":847015.1437781961,"paytime":55960.226718985265,"claimcounter":0,"claimnumber":"3_0"}]}} > > > and hence passing duplicates to the next operation. > > My question is what is the best practice to avoid that? > > https://github.com/afuyo/KStreamsDemo/blob/master/src/main/java/kstream.demo/CustomerStreamPipelineHDI.java#L423 > > Best regards > Artur > > > On Wed, Dec 13, 2017 at 3:42 PM, Bill Bejeck <b...@confluent.io> wrote: > >> Hi Artur, >> >> The most direct way for deduplication (I'm using the term deduplication to >> mean records with the same key, but not necessarily the same value, where >> later records are considered) is to set the CACHE_MAX_BYTES_BUFFERING_ >> CONFIG >> setting to a value greater than zero. >> >> Your other option is to use the PAPI and by writing your own logic in >> conjunction with a state store determine what constitutes a duplicate and >> when to emit a record. You could take the same approach in the DSL layer >> using a Transformer. >> >> HTH. >> >> Bill >> >> On Wed, Dec 13, 2017 at 7:00 AM, Artur Mrozowski <art...@gmail.com> wrote: >> >>> Hi >>> I run an app where I transform KTable to stream and then I groupBy and >>> aggregate and capture the results in KTable again. That generates many >>> duplicates. >>> >>> I have played with exactly once semantics that seems to reduce duplicates >>> for records that should be unique. But I still get duplicates on keys >> that >>> have two or more records. >>> >>> I could not reproduce it on small number of records so I disable caching >> by >>> setting CACHE_MAX_BYTES_BUFFERING_CONFIG to 0. Surely enough, I got >> loads >>> of duplicates, even these previously eliminated by exactly once >> semantics. >>> Now I have hard time to enable it again on Confluent 3.3. >>> >>> But, generally what it the best deduplication strategy for Kafka Streams? >>> >>> Artur >>> >> >
signature.asc
Description: OpenPGP digital signature