[ https://issues.apache.org/jira/browse/KAFKA-7158?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16569349#comment-16569349 ]
Guozhang Wang commented on KAFKA-7158: -------------------------------------- Actually I made the claim too quick :) I retried multiple times on trunk but cannot reproduce the issue. But when running it on 2.0 branch I can indeed reproduce it, which means that some patches in trunk have already fixed it. I browsed through the commits since 2.0 and nailed down this one: https://issues.apache.org/jira/browse/KAFKA-7080 I can confirm that the commit right before this patch can still re-produce the issue easily, and the commit right after it cannot reproduce the issue any more. By looking into the PR carefully, I realize it is actually a pretty critical bug fix as it is quite common that because of the wrong segmentInterval value, the segmentid used in the cache would be wrong, and as a result the cache segment that puts call would be access to is wrong, causing duplicated writes. And this is what [~cah6] has observed. [~vvcephei], since you fixed KAFKA-7080, could you also file a PR for 2.0 branch since it is actually a pretty critical bug. > Duplicates when searching kafka stream state store with caching > --------------------------------------------------------------- > > Key: KAFKA-7158 > URL: https://issues.apache.org/jira/browse/KAFKA-7158 > Project: Kafka > Issue Type: Bug > Components: streams > Affects Versions: 1.1.0 > Reporter: Christian Henry > Priority: Major > > See mailing list email with same name for initial discussion, reposting my > initial email here for convenience: > {noformat} > We have a kafka stream application, and one of our transform steps keeps a > state store to filter out messages with a previously seen GUID. That is, our > transform looks like: > public KeyValue<byte[], String> transform(byte[] key, String guid) { > try (WindowStoreIterator<DuplicateMessageMetadata> iterator = > duplicateStore.fetch(correlationId, start, now)) { > if (iterator.hasNext()) { > return null; > } else { > duplicateStore.put(correlationId, some metadata); > return new KeyValue<>(key, message); > } > } > } > where the duplicateStore is a persistent windowed store with caching enabled. > I was debugging some tests and found that sometimes when calling all() or > fetchAll() on the duplicate store and stepping through the iterator, it would > return the same guid more than once, even if it was only inserted into the > store once. More specifically, if I had the following guids sent to the > stream: [11111, 22222, ... 99999] (for 9 values total), sometimes it would > return 10 values, with one (or more) of the values being returned twice by > the iterator. However, this would not show up with a fetch(guid) on that > specific guid. For instance, if 11111 was being returned twice by fetchAll(), > calling duplicateStore.fetch("11111", start, end) will still return an > iterator with size of 1. > I dug into this a bit more by setting a breakpoint in > SegmentedCacheFunction#compareSegmentedKeys(cacheKey, storeKey) and watching > the two input values as I looped through the iterator using > "while(iterator.hasNext()) { print(iterator.next()) }". In one test, the > duplicate value was 66666, and saw the following behavior (trimming off the > segment values from the byte input): > -- compareSegmentedKeys(cacheKey = 66666, storeKey = 22222) > -- next() returns 66666 > and > -- compareSegmentedKeys(cacheKey = 77777, storeKey = 66666) > -- next() returns 66666 > Besides those, the input values are the same and the output is as expected. > Additionally, a coworker noted that the number of duplicates always matches > the number of times Long.compare(cacheSegmentId, storeSegmentId) returns a > non-zero value, indicating that duplicates are likely arising due to the > segment comparison. {noformat} > > Basically, what we're seeing is that if you have a persistent store with > caching enabled, you will sometimes get duplicate keys when querying for all > keys (using all() or fetchAll()) even though fetch(key) will only return one > result. That is, if you had a fresh store with nothing in it and did > something like: > {code:java} > IntStream.rangeClosed(1, 100).forEach(i -> store.put("key" + i, "value" + i)); > {code} > then calling > {code:java} > store.fetchAll(start, end) > {code} > would return an iterator with MORE than 100 items, whereas if you explicitly > did > {code:java} > store.fetch("key" + i) > {code} > for i = 1 to 100, each fetch would only return a single item in the iterator. -- This message was sent by Atlassian JIRA (v7.6.3#76005)