Hi Bill, I am actually referring to duplicates as completely identical records. I can observe it when I convert result of left join between KTables to stream. The resulting stream will often contain identical messages. For example we have
KTable<String,ArrayList> left {"claimcounter": 0, "claimreporttime": 55948.33110985625, "claimnumber": "3_0", "claimtime": 708.521153490306} and KTable <String,ArrayList> right {"claimcounter": 0, "paytime": 55960.226718985265, "claimnumber": "3_0", "payment": 847015.1437781961} When I leftjoin theses two objects the result in the state store will be an object containing two ArrayLists left and right, like this {"claimList":{"lst":[{"claimnumber":"3_0","claimtime":"708.521153490306","claimreporttime":"55948.33110985625","claimcounter":"0"}]},"paymentList":{"lst":[{"payment":847015.1437781961,"paytime":55960.226718985265,"claimcounter":0,"claimnumber":"3_0"}]}} But I want to continue processing the results by using groupBy and aggregate so I convert reuslt of the leftjoin to stream. Now the resulting reparation and changelog topics,most of the time, will contain two identical messages, like this {"claimList":{"lst":[{"claimnumber":"3_0","claimtime":"708.521153490306","claimreporttime":"55948.33110985625","claimcounter":"0"}]},"paymentList":{"lst":[{"payment":847015.1437781961,"paytime":55960.226718985265,"claimcounter":0,"claimnumber":"3_0"}]}} {"claimList":{"lst":[{"claimnumber":"3_0","claimtime":"708.521153490306","claimreporttime":"55948.33110985625","claimcounter":"0"}]},"paymentList":{"lst":[{"payment":847015.1437781961,"paytime":55960.226718985265,"claimcounter":0,"claimnumber":"3_0"}]}} and hence passing duplicates to the next operation. My question is what is the best practice to avoid that? https://github.com/afuyo/KStreamsDemo/blob/master/src/main/java/kstream.demo/CustomerStreamPipelineHDI.java#L423 Best regards Artur On Wed, Dec 13, 2017 at 3:42 PM, Bill Bejeck <b...@confluent.io> wrote: > Hi Artur, > > The most direct way for deduplication (I'm using the term deduplication to > mean records with the same key, but not necessarily the same value, where > later records are considered) is to set the CACHE_MAX_BYTES_BUFFERING_ > CONFIG > setting to a value greater than zero. > > Your other option is to use the PAPI and by writing your own logic in > conjunction with a state store determine what constitutes a duplicate and > when to emit a record. You could take the same approach in the DSL layer > using a Transformer. > > HTH. > > Bill > > On Wed, Dec 13, 2017 at 7:00 AM, Artur Mrozowski <art...@gmail.com> wrote: > > > Hi > > I run an app where I transform KTable to stream and then I groupBy and > > aggregate and capture the results in KTable again. That generates many > > duplicates. > > > > I have played with exactly once semantics that seems to reduce duplicates > > for records that should be unique. But I still get duplicates on keys > that > > have two or more records. > > > > I could not reproduce it on small number of records so I disable caching > by > > setting CACHE_MAX_BYTES_BUFFERING_CONFIG to 0. Surely enough, I got > loads > > of duplicates, even these previously eliminated by exactly once > semantics. > > Now I have hard time to enable it again on Confluent 3.3. > > > > But, generally what it the best deduplication strategy for Kafka Streams? > > > > Artur > > >