[ 
https://issues.apache.org/jira/browse/KAFKA-7595?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16675891#comment-16675891
 ] 

John Roesler commented on KAFKA-7595:
-------------------------------------

Not sure if I'm thinking about this properly, but...

For example you get some event <k, v1> on the left and <k, v2> on the right, 
and you want to produce <k, (v1, v2)>. Let's say you get the left event first.
 # With *caching disabled*, *whether or not the join is materialized*, I'd 
expect to see <k, (v1, null)> followed by <k, (v1, v2)>. It's not clear to me 
if you call this a "duplicate".
 # With *caching enabled* and the *join not materialized*, I'd expect to see 
duplicates: <k, (v1, v2)> followed by <k, (v1, v2)>.
 ** This could happen if <k, v1> gets cached on the left and <k, v2> gets 
cached on the right. They only trigger the join upon flush, and they trigger it 
independently, so when the left trigger happens, the right value is already 
visible and vice-versa, hence the duplicates.
 ** Even though the caches try to de-duplicate events, they are mutually 
oblivious (they don't know their results will later be used in a join, and they 
each don't know the other exists), so they can't cooperate to de-duplicate the 
results.
 # If the *join is materialized AND caching is enabled*, then there are still 
"technically" duplicate events, but they get de-duplicated by the join's cache.

*So this this in mind, can you clarify which of these you see contradicted?* 

*In particular, I wasn't sure if you are calling the sequence "<k, (v1, null)>, 
<k, (v1, v2)>" a "duplicate" or not. I _think_ this result would be expected 
for your cases 1 and 2.*

 

And just a plug for this new feature, if you'd like to achieve de-duplication 
of either sequence "<k, (v1, null)>, <k, (v1, v2)>" or "<k, (v1, v2)>, <k, (v1, 
v2)>" without having to materialize the join, as of 2.1, you can use the new 
"suppress" operator: 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-328%3A+Ability+to+suppress+updates+for+KTables

> Kafka Streams: KTrable to KTable join introduces duplicates in downstream 
> KTable
> --------------------------------------------------------------------------------
>
>                 Key: KAFKA-7595
>                 URL: https://issues.apache.org/jira/browse/KAFKA-7595
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 2.0.0
>            Reporter: Vik Gamov
>            Priority: Major
>
> When perform KTable to KTable join after aggregation, there are duplicates in 
> resulted KTable.
> 1. caching disabled, no materialized => duplicates
> {{streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 
> 0);}}
> {{KTable<Long, Long> ratingCounts = ratingsById.count();}}
> {{KTable<Long, Double> ratingSums = ratingsById.reduce((v1, v2) -> v1 + v2);}}
> {{KTable<Long, Double> ratingAverage = ratingSums.join(ratingCounts,}}
> {{ (sum, count) -> sum / count.doubleValue());}}
> 2. caching disabled, materialized => duplicate
> {{streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 
> 0);}}{{KTable<Long, Long> ratingCounts = ratingsById.count();}}
> {{KTable<Long, Double> ratingSums = ratingsById.reduce((v1, v2) -> v1 + v2);}}
> {{KTable<Long, Double> ratingAverage = ratingSums.join(ratingCounts,}}
> {{ (sum, count) -> sum / count.doubleValue(),}}
> {{ Materialized.as("average-ratings"));}}
> 3. caching enabled, materiazlized => all good
> {{// Enable record cache of size 10 MB.}}
> {{streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 10 
> * 1024 * 1024L);}}
> {{// Set commit interval to 1 second.}}
> {{streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 
> 1000);}}{{KTable<Long, Long> ratingCounts = ratingsById.count();}}
> {{KTable<Long, Double> ratingSums = ratingsById.reduce((v1, v2) -> v1 + v2);}}
> {{KTable<Long, Double> ratingAverage = ratingSums.join(ratingCounts,}}
> {{ (sum, count) -> sum / count.doubleValue(),}}
> {{ Materialized.as("average-ratings"));}}
>  
> Demo app 
> [https://github.com/tlberglund/streams-movie-demo/blob/master/streams/src/main/java/io/confluent/demo/StreamsDemo.java#L107]
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to