I think you are hitting: https://issues.apache.org/jira/browse/KAFKA-4609


-Matthias

On 12/18/17 1:52 AM, Artur Mrozowski wrote:
> Hi Bill,
> 
>  I am actually referring to duplicates as completely identical records. I
> can observe it when I convert result of left join between KTables to
> stream. The resulting stream will often contain identical messages.
> For example we have
> 
> KTable<String,ArrayList> left
> {"claimcounter": 0, "claimreporttime": 55948.33110985625, "claimnumber":
> "3_0", "claimtime": 708.521153490306}
> 
> and KTable <String,ArrayList> right
> 
> {"claimcounter": 0, "paytime": 55960.226718985265, "claimnumber": "3_0",
> "payment": 847015.1437781961}
> 
> When I leftjoin theses two objects the result in the state store will be an
> object containing two  ArrayLists left and right, like this
> 
> {"claimList":{"lst":[{"claimnumber":"3_0","claimtime":"708.521153490306","claimreporttime":"55948.33110985625","claimcounter":"0"}]},"paymentList":{"lst":[{"payment":847015.1437781961,"paytime":55960.226718985265,"claimcounter":0,"claimnumber":"3_0"}]}}
> 
> But I want to continue processing the results by using groupBy and
> aggregate so I convert reuslt of the leftjoin to stream. Now the resulting
> reparation and changelog topics,most of the time, will contain two
> identical messages, like this
> 
> {"claimList":{"lst":[{"claimnumber":"3_0","claimtime":"708.521153490306","claimreporttime":"55948.33110985625","claimcounter":"0"}]},"paymentList":{"lst":[{"payment":847015.1437781961,"paytime":55960.226718985265,"claimcounter":0,"claimnumber":"3_0"}]}}
> {"claimList":{"lst":[{"claimnumber":"3_0","claimtime":"708.521153490306","claimreporttime":"55948.33110985625","claimcounter":"0"}]},"paymentList":{"lst":[{"payment":847015.1437781961,"paytime":55960.226718985265,"claimcounter":0,"claimnumber":"3_0"}]}}
> 
> 
> and hence passing duplicates to the next operation.
> 
> My question is what is the best practice to avoid that?
> 
> https://github.com/afuyo/KStreamsDemo/blob/master/src/main/java/kstream.demo/CustomerStreamPipelineHDI.java#L423
> 
> Best regards
> Artur
> 
> 
> On Wed, Dec 13, 2017 at 3:42 PM, Bill Bejeck <b...@confluent.io> wrote:
> 
>> Hi Artur,
>>
>> The most direct way for deduplication (I'm using the term deduplication to
>> mean records with the same key, but not necessarily the same value, where
>> later records are considered) is to set the  CACHE_MAX_BYTES_BUFFERING_
>> CONFIG
>> setting to a value greater than zero.
>>
>> Your other option is to use the PAPI and by writing your own logic in
>> conjunction with a state store determine what constitutes a duplicate and
>> when to emit a record.  You could take the same approach in the DSL layer
>> using a Transformer.
>>
>> HTH.
>>
>> Bill
>>
>> On Wed, Dec 13, 2017 at 7:00 AM, Artur Mrozowski <art...@gmail.com> wrote:
>>
>>> Hi
>>> I run an app where I transform KTable to stream and then I groupBy and
>>> aggregate and capture the results in KTable again. That generates many
>>> duplicates.
>>>
>>> I have played with exactly once semantics that seems to reduce duplicates
>>> for records that should be unique. But I still get duplicates on keys
>> that
>>> have two or more records.
>>>
>>> I could not reproduce it on small number of records so I disable caching
>> by
>>> setting CACHE_MAX_BYTES_BUFFERING_CONFIG to 0. Surely enough, I got
>> loads
>>> of duplicates, even these previously eliminated by exactly once
>> semantics.
>>> Now I have hard time to enable it again on Confluent 3.3.
>>>
>>> But, generally what it the best deduplication strategy for Kafka Streams?
>>>
>>> Artur
>>>
>>
> 

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to