[ 
https://issues.apache.org/jira/browse/KAFKA-7158?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16570769#comment-16570769
 ] 

ASF GitHub Bot commented on KAFKA-7158:
---------------------------------------

guozhangwang opened a new pull request #5466: [NOT MERGE] KAFKA-7158: Add unit 
test for window store range queries
URL: https://github.com/apache/kafka/pull/5466
 
 
   While debugging the reported issue, I found that our current unit test lacks 
coverage to actually expose the underlying root cause.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Duplicates when searching kafka stream state store with caching
> ---------------------------------------------------------------
>
>                 Key: KAFKA-7158
>                 URL: https://issues.apache.org/jira/browse/KAFKA-7158
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 1.1.0
>            Reporter: Christian Henry
>            Priority: Major
>
> See mailing list email with same name for initial discussion, reposting my 
> initial email here for convenience:
> {noformat}
> We have a kafka stream application, and one of our transform steps keeps a 
> state store to filter out messages with a previously seen GUID. That is, our 
> transform looks like:
> public KeyValue<byte[], String> transform(byte[] key, String guid) {
>     try (WindowStoreIterator<DuplicateMessageMetadata> iterator = 
> duplicateStore.fetch(correlationId, start, now)) {
>         if (iterator.hasNext()) {
>             return null;
>         } else {
>             duplicateStore.put(correlationId, some metadata);
>             return new KeyValue<>(key, message);
>         }
>     }
> }
> where the duplicateStore is a persistent windowed store with caching enabled. 
> I was debugging some tests and found that sometimes when calling all() or 
> fetchAll() on the duplicate store and stepping through the iterator, it would 
> return the same guid more than once, even if it was only inserted into the 
> store once. More specifically, if I had the following guids sent to the 
> stream: [11111, 22222, ... 99999] (for 9 values total), sometimes it would 
> return 10 values, with one (or more) of the values being returned twice by 
> the iterator. However, this would not show up with a fetch(guid) on that 
> specific guid. For instance, if 11111 was being returned twice by fetchAll(), 
> calling duplicateStore.fetch("11111", start, end) will still return an 
> iterator with size of 1. 
> I dug into this a bit more by setting a breakpoint in 
> SegmentedCacheFunction#compareSegmentedKeys(cacheKey, storeKey) and watching 
> the two input values as I looped through the iterator using 
> "while(iterator.hasNext()) { print(iterator.next()) }". In one test, the 
> duplicate value was 66666, and saw the following behavior (trimming off the 
> segment values from the byte input):
> -- compareSegmentedKeys(cacheKey = 66666, storeKey = 22222)
> -- next() returns 66666
> and 
> -- compareSegmentedKeys(cacheKey = 77777, storeKey = 66666)
> -- next() returns 66666
> Besides those, the input values are the same and the output is as expected. 
> Additionally, a coworker noted that the number of duplicates always matches 
> the number of times Long.compare(cacheSegmentId, storeSegmentId) returns a 
> non-zero value, indicating that duplicates are likely arising due to the 
> segment comparison. {noformat}
>  
> Basically, what we're seeing is that if you have a persistent store with 
> caching enabled, you will sometimes get duplicate keys when querying for all 
> keys (using all() or fetchAll()) even though fetch(key) will only return one 
> result. That is, if you had a fresh store with nothing in it and did 
> something like:
> {code:java}
> IntStream.rangeClosed(1, 100).forEach(i -> store.put("key" + i, "value" + i));
> {code}
> then calling
> {code:java}
> store.fetchAll(start, end)
> {code}
> would return an iterator with MORE than 100 items, whereas if you explicitly 
> did
> {code:java}
> store.fetch("key" + i)
> {code}
> for i = 1 to 100, each fetch would only return a single item in the iterator. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to