[
https://issues.apache.org/jira/browse/KAFKA-7158?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Guozhang Wang resolved KAFKA-7158.
----------------------------------
Resolution: Fixed
Assignee: John Roesler
Fix Version/s: 2.1.0
2.0.1
John has cherry-picked the minimal fix from his PR in KAFKA-7080 to 2.0 and we
have added a unit test to verify this is indeed fixed the issue.
> Duplicates when searching kafka stream state store with caching
> ---------------------------------------------------------------
>
> Key: KAFKA-7158
> URL: https://issues.apache.org/jira/browse/KAFKA-7158
> Project: Kafka
> Issue Type: Bug
> Components: streams
> Affects Versions: 1.1.0
> Reporter: Christian Henry
> Assignee: John Roesler
> Priority: Major
> Fix For: 2.0.1, 2.1.0
>
>
> See mailing list email with same name for initial discussion, reposting my
> initial email here for convenience:
> {noformat}
> We have a kafka stream application, and one of our transform steps keeps a
> state store to filter out messages with a previously seen GUID. That is, our
> transform looks like:
> public KeyValue<byte[], String> transform(byte[] key, String guid) {
> try (WindowStoreIterator<DuplicateMessageMetadata> iterator =
> duplicateStore.fetch(correlationId, start, now)) {
> if (iterator.hasNext()) {
> return null;
> } else {
> duplicateStore.put(correlationId, some metadata);
> return new KeyValue<>(key, message);
> }
> }
> }
> where the duplicateStore is a persistent windowed store with caching enabled.
> I was debugging some tests and found that sometimes when calling all() or
> fetchAll() on the duplicate store and stepping through the iterator, it would
> return the same guid more than once, even if it was only inserted into the
> store once. More specifically, if I had the following guids sent to the
> stream: [11111, 22222, ... 99999] (for 9 values total), sometimes it would
> return 10 values, with one (or more) of the values being returned twice by
> the iterator. However, this would not show up with a fetch(guid) on that
> specific guid. For instance, if 11111 was being returned twice by fetchAll(),
> calling duplicateStore.fetch("11111", start, end) will still return an
> iterator with size of 1.
> I dug into this a bit more by setting a breakpoint in
> SegmentedCacheFunction#compareSegmentedKeys(cacheKey, storeKey) and watching
> the two input values as I looped through the iterator using
> "while(iterator.hasNext()) { print(iterator.next()) }". In one test, the
> duplicate value was 66666, and saw the following behavior (trimming off the
> segment values from the byte input):
> -- compareSegmentedKeys(cacheKey = 66666, storeKey = 22222)
> -- next() returns 66666
> and
> -- compareSegmentedKeys(cacheKey = 77777, storeKey = 66666)
> -- next() returns 66666
> Besides those, the input values are the same and the output is as expected.
> Additionally, a coworker noted that the number of duplicates always matches
> the number of times Long.compare(cacheSegmentId, storeSegmentId) returns a
> non-zero value, indicating that duplicates are likely arising due to the
> segment comparison. {noformat}
>
> Basically, what we're seeing is that if you have a persistent store with
> caching enabled, you will sometimes get duplicate keys when querying for all
> keys (using all() or fetchAll()) even though fetch(key) will only return one
> result. That is, if you had a fresh store with nothing in it and did
> something like:
> {code:java}
> IntStream.rangeClosed(1, 100).forEach(i -> store.put("key" + i, "value" + i));
> {code}
> then calling
> {code:java}
> store.fetchAll(start, end)
> {code}
> would return an iterator with MORE than 100 items, whereas if you explicitly
> did
> {code:java}
> store.fetch("key" + i)
> {code}
> for i = 1 to 100, each fetch would only return a single item in the iterator.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)