Hello Christian, Since you are calling fetch(key, start, end) I'm assuming that duplicateStore is a WindowedStore. With a windowed store, it is possible that a single key can fall into multiple windows, and hence be returned from the WindowStoreIterator, note its type is <Windowed<K>, V>
So I'd first want to know 1) which Kafka version are you using. 2) why you'd need a window store, and if yes, could you consider using the single point fetch (added in KAFKA-6560) other than the range query (which is more expensive as well). Guozhang On Fri, Jun 29, 2018 at 11:38 AM, Christian Henry < christian.henr...@gmail.com> wrote: > Hi all, > > I'll first describe a simplified view of relevant parts of our setup (which > should be enough to repro), describe the behavior we're seeing, and then > note some information I've come across after digging in a bit. > > We have a kafka stream application, and one of our transform steps keeps a > state store to filter out messages with a previously seen GUID. That is, > our transform looks like: > > public KeyValue<byte[], String> transform(byte[] key, String guid) { > try (WindowStoreIterator<DuplicateMessageMetadata> iterator = > duplicateStore.fetch(correlationId, start, now)) { > if (iterator.hasNext()) { > return null; > } else { > duplicateStore.put(correlationId, some metadata); > return new KeyValue<>(key, message); > } > }} > > where the duplicateStore is a persistent windowed store with caching > enabled. > > I was debugging some tests and found that sometimes when calling > *all()* or *fetchAll() > *on the duplicate store and stepping through the iterator, it would return > the same guid more than once, even if it was only inserted into the store > once. More specifically, if I had the following guids sent to the stream: > [11111, 22222, ... 99999] (for 9 values total), sometimes it would return > 10 values, with one (or more) of the values being returned twice by the > iterator. However, this would not show up with a *fetch(guid)* on that > specific guid. For instance, if 11111 was being returned twice by > *fetchAll()*, calling *duplicateStore.fetch("11111", start, end)* will > still return an iterator with size of 1. > > I dug into this a bit more by setting a breakpoint in > *SegmentedCacheFunction#compareSegmentedKeys(cacheKey, > storeKey)* and watching the two input values as I looped through the > iterator using "*while(iterator.hasNext()) { print(iterator.next()) }*". In > one test, the duplicate value was 66666, and saw the following behavior > (trimming off the segment values from the byte input): > -- compareSegmentedKeys(cacheKey = 66666, storeKey = 22222) > -- next() returns 66666 > and > -- compareSegmentedKeys(cacheKey = 77777, storeKey = 66666) > -- next() returns 66666 > Besides those, the input values are the same and the output is as expected. > Additionally, a coworker noted that the number of duplicates always matches > the number of times *Long.compare(cacheSegmentId, storeSegmentId) *returns > a non-zero value, indicating that duplicates are likely arising due to the > segment comparison. > -- -- Guozhang