[ 
https://issues.apache.org/jira/browse/KAFKA-7158?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16568970#comment-16568970
 ] 

Guozhang Wang commented on KAFKA-7158:
--------------------------------------

Hi [~cah6] I've tried your code on my laptop for 10+ times but still cannot 
reproduce this issue.. on the other hand, could you try replace

https://gist.github.com/cah6/adc2c52514f5386597a4bba6c429ff63#file-duplicateexample-java-L32

by using a random generated folder, such as

{code}
streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, 
TestUtils.tempDirectory().getPath());
{code}

And try again?

> Duplicates when searching kafka stream state store with caching
> ---------------------------------------------------------------
>
>                 Key: KAFKA-7158
>                 URL: https://issues.apache.org/jira/browse/KAFKA-7158
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 1.1.0
>            Reporter: Christian Henry
>            Priority: Major
>
> See mailing list email with same name for initial discussion, reposting my 
> initial email here for convenience:
> {noformat}
> We have a kafka stream application, and one of our transform steps keeps a 
> state store to filter out messages with a previously seen GUID. That is, our 
> transform looks like:
> public KeyValue<byte[], String> transform(byte[] key, String guid) {
>     try (WindowStoreIterator<DuplicateMessageMetadata> iterator = 
> duplicateStore.fetch(correlationId, start, now)) {
>         if (iterator.hasNext()) {
>             return null;
>         } else {
>             duplicateStore.put(correlationId, some metadata);
>             return new KeyValue<>(key, message);
>         }
>     }
> }
> where the duplicateStore is a persistent windowed store with caching enabled. 
> I was debugging some tests and found that sometimes when calling all() or 
> fetchAll() on the duplicate store and stepping through the iterator, it would 
> return the same guid more than once, even if it was only inserted into the 
> store once. More specifically, if I had the following guids sent to the 
> stream: [11111, 22222, ... 99999] (for 9 values total), sometimes it would 
> return 10 values, with one (or more) of the values being returned twice by 
> the iterator. However, this would not show up with a fetch(guid) on that 
> specific guid. For instance, if 11111 was being returned twice by fetchAll(), 
> calling duplicateStore.fetch("11111", start, end) will still return an 
> iterator with size of 1. 
> I dug into this a bit more by setting a breakpoint in 
> SegmentedCacheFunction#compareSegmentedKeys(cacheKey, storeKey) and watching 
> the two input values as I looped through the iterator using 
> "while(iterator.hasNext()) { print(iterator.next()) }". In one test, the 
> duplicate value was 66666, and saw the following behavior (trimming off the 
> segment values from the byte input):
> -- compareSegmentedKeys(cacheKey = 66666, storeKey = 22222)
> -- next() returns 66666
> and 
> -- compareSegmentedKeys(cacheKey = 77777, storeKey = 66666)
> -- next() returns 66666
> Besides those, the input values are the same and the output is as expected. 
> Additionally, a coworker noted that the number of duplicates always matches 
> the number of times Long.compare(cacheSegmentId, storeSegmentId) returns a 
> non-zero value, indicating that duplicates are likely arising due to the 
> segment comparison. {noformat}
>  
> Basically, what we're seeing is that if you have a persistent store with 
> caching enabled, you will sometimes get duplicate keys when querying for all 
> keys (using all() or fetchAll()) even though fetch(key) will only return one 
> result. That is, if you had a fresh store with nothing in it and did 
> something like:
> {code:java}
> IntStream.rangeClosed(1, 100).forEach(i -> store.put("key" + i, "value" + i));
> {code}
> then calling
> {code:java}
> store.fetchAll(start, end)
> {code}
> would return an iterator with MORE than 100 items, whereas if you explicitly 
> did
> {code:java}
> store.fetch("key" + i)
> {code}
> for i = 1 to 100, each fetch would only return a single item in the iterator. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to