Yes, please create a JIRA reporting this: the `all()` and `fetchAll()` source code was not modified since its first added into ReadOnlyWindowStore API, so it's likely a lurking bug caused the issue. And please attach your code / sample data if possible to help us reproduce this issue in order to investigate further.
Guozhang On Fri, Jul 6, 2018 at 7:41 AM, Christian Henry <christian.henr...@gmail.com > wrote: > Any other ideas here? Should I create a bug? > > On Tue, Jul 3, 2018 at 1:21 PM, Christian Henry < > christian.henr...@gmail.com > > wrote: > > > Nope, we're setting retainDuplicates to false. > > > > On Tue, Jul 3, 2018 at 6:55 AM, Damian Guy <damian....@gmail.com> wrote: > > > >> Hi, > >> > >> When you create your window store do you have `retainDuplicates` set to > >> `true`? i.e., assuming you use `Stores.persistentWindowStore(...)` is > the > >> last param `true`? > >> > >> Thanks, > >> Damian > >> > >> On Mon, 2 Jul 2018 at 17:29 Christian Henry < > christian.henr...@gmail.com> > >> wrote: > >> > >> > We're using the latest Kafka (1.1.0). I'd like to note that when we > >> > encounter duplicates, the window is the same as well. > >> > > >> > My original code was a bit simplifier -- we also insert into the store > >> if > >> > iterator.hasNext() as well, before returning null. We're using a > window > >> > store because we have a punctuator that runs every few minutes to > count > >> > GUIDs with similar metadata, and reports that in a healthcheck. Since > >> our > >> > healthcheck window is less than the retention period of the store > >> > (retention period might be 1 hour, healthcheck window is ~5 min), the > >> > window store seemed like a good way to efficiently query all of the > most > >> > recent data. Note that since the healthcheck punctuator needs to > >> aggregate > >> > on all the recent values, it has to do a *fetchAll(start, end) *which > is > >> > how these duplicates are affecting us. > >> > > >> > On Fri, Jun 29, 2018 at 7:32 PM, Guozhang Wang <wangg...@gmail.com> > >> wrote: > >> > > >> > > Hello Christian, > >> > > > >> > > Since you are calling fetch(key, start, end) I'm assuming that > >> > > duplicateStore > >> > > is a WindowedStore. With a windowed store, it is possible that a > >> single > >> > key > >> > > can fall into multiple windows, and hence be returned from the > >> > > WindowStoreIterator, > >> > > note its type is <Windowed<K>, V> > >> > > > >> > > So I'd first want to know > >> > > > >> > > 1) which Kafka version are you using. > >> > > 2) why you'd need a window store, and if yes, could you consider > using > >> > the > >> > > single point fetch (added in KAFKA-6560) other than the range query > >> > (which > >> > > is more expensive as well). > >> > > > >> > > > >> > > > >> > > Guozhang > >> > > > >> > > > >> > > On Fri, Jun 29, 2018 at 11:38 AM, Christian Henry < > >> > > christian.henr...@gmail.com> wrote: > >> > > > >> > > > Hi all, > >> > > > > >> > > > I'll first describe a simplified view of relevant parts of our > setup > >> > > (which > >> > > > should be enough to repro), describe the behavior we're seeing, > and > >> > then > >> > > > note some information I've come across after digging in a bit. > >> > > > > >> > > > We have a kafka stream application, and one of our transform steps > >> > keeps > >> > > a > >> > > > state store to filter out messages with a previously seen GUID. > That > >> > is, > >> > > > our transform looks like: > >> > > > > >> > > > public KeyValue<byte[], String> transform(byte[] key, String > guid) { > >> > > > try (WindowStoreIterator<DuplicateMessageMetadata> iterator = > >> > > > duplicateStore.fetch(correlationId, start, now)) { > >> > > > if (iterator.hasNext()) { > >> > > > return null; > >> > > > } else { > >> > > > duplicateStore.put(correlationId, some metadata); > >> > > > return new KeyValue<>(key, message); > >> > > > } > >> > > > }} > >> > > > > >> > > > where the duplicateStore is a persistent windowed store with > caching > >> > > > enabled. > >> > > > > >> > > > I was debugging some tests and found that sometimes when calling > >> > > > *all()* or *fetchAll() > >> > > > *on the duplicate store and stepping through the iterator, it > would > >> > > return > >> > > > the same guid more than once, even if it was only inserted into > the > >> > store > >> > > > once. More specifically, if I had the following guids sent to the > >> > stream: > >> > > > [11111, 22222, ... 99999] (for 9 values total), sometimes it would > >> > return > >> > > > 10 values, with one (or more) of the values being returned twice > by > >> the > >> > > > iterator. However, this would not show up with a *fetch(guid)* on > >> that > >> > > > specific guid. For instance, if 11111 was being returned twice by > >> > > > *fetchAll()*, calling *duplicateStore.fetch("11111", start, end)* > >> will > >> > > > still return an iterator with size of 1. > >> > > > > >> > > > I dug into this a bit more by setting a breakpoint in > >> > > > *SegmentedCacheFunction#compareSegmentedKeys(cacheKey, > >> > > > storeKey)* and watching the two input values as I looped through > the > >> > > > iterator using "*while(iterator.hasNext()) { > print(iterator.next()) > >> > }*". > >> > > In > >> > > > one test, the duplicate value was 66666, and saw the following > >> behavior > >> > > > (trimming off the segment values from the byte input): > >> > > > -- compareSegmentedKeys(cacheKey = 66666, storeKey = 22222) > >> > > > -- next() returns 66666 > >> > > > and > >> > > > -- compareSegmentedKeys(cacheKey = 77777, storeKey = 66666) > >> > > > -- next() returns 66666 > >> > > > Besides those, the input values are the same and the output is as > >> > > expected. > >> > > > Additionally, a coworker noted that the number of duplicates > always > >> > > matches > >> > > > the number of times *Long.compare(cacheSegmentId, storeSegmentId) > >> > > *returns > >> > > > a non-zero value, indicating that duplicates are likely arising > due > >> to > >> > > the > >> > > > segment comparison. > >> > > > > >> > > > >> > > > >> > > > >> > > -- > >> > > -- Guozhang > >> > > > >> > > >> > > > > > -- -- Guozhang