We're using the latest Kafka (1.1.0). I'd like to note that when we
encounter duplicates, the window is the same as well.

My original code was a bit simplifier -- we also insert into the store if
iterator.hasNext() as well, before returning null. We're using a window
store because we have a punctuator that runs every few minutes to count
GUIDs with similar metadata, and reports that in a healthcheck. Since our
healthcheck window is less than the retention period of the store
(retention period might be 1 hour, healthcheck window is ~5 min), the
window store seemed like a good way to efficiently query all of the most
recent data. Note that since the healthcheck punctuator needs to aggregate
on all the recent values, it has to do a *fetchAll(start, end) *which is
how these duplicates are affecting us.

On Fri, Jun 29, 2018 at 7:32 PM, Guozhang Wang <wangg...@gmail.com> wrote:

> Hello Christian,
>
> Since you are calling fetch(key, start, end) I'm assuming that
> duplicateStore
> is a WindowedStore. With a windowed store, it is possible that a single key
> can fall into multiple windows, and hence be returned from the
> WindowStoreIterator,
> note its type is <Windowed<K>, V>
>
> So I'd first want to know
>
> 1) which Kafka version are you using.
> 2) why you'd need a window store, and if yes, could you consider using the
> single point fetch (added in KAFKA-6560) other than the range query (which
> is more expensive as well).
>
>
>
> Guozhang
>
>
> On Fri, Jun 29, 2018 at 11:38 AM, Christian Henry <
> christian.henr...@gmail.com> wrote:
>
> > Hi all,
> >
> > I'll first describe a simplified view of relevant parts of our setup
> (which
> > should be enough to repro), describe the behavior we're seeing, and then
> > note some information I've come across after digging in a bit.
> >
> > We have a kafka stream application, and one of our transform steps keeps
> a
> > state store to filter out messages with a previously seen GUID. That is,
> > our transform looks like:
> >
> > public KeyValue<byte[], String> transform(byte[] key, String guid) {
> >     try (WindowStoreIterator<DuplicateMessageMetadata> iterator =
> > duplicateStore.fetch(correlationId, start, now)) {
> >         if (iterator.hasNext()) {
> >             return null;
> >         } else {
> >             duplicateStore.put(correlationId, some metadata);
> >             return new KeyValue<>(key, message);
> >         }
> >     }}
> >
> > where the duplicateStore is a persistent windowed store with caching
> > enabled.
> >
> > I was debugging some tests and found that sometimes when calling
> > *all()* or *fetchAll()
> > *on the duplicate store and stepping through the iterator, it would
> return
> > the same guid more than once, even if it was only inserted into the store
> > once. More specifically, if I had the following guids sent to the stream:
> > [11111, 22222, ... 99999] (for 9 values total), sometimes it would return
> > 10 values, with one (or more) of the values being returned twice by the
> > iterator. However, this would not show up with a *fetch(guid)* on that
> > specific guid. For instance, if 11111 was being returned twice by
> > *fetchAll()*, calling *duplicateStore.fetch("11111", start, end)* will
> > still return an iterator with size of 1.
> >
> > I dug into this a bit more by setting a breakpoint in
> > *SegmentedCacheFunction#compareSegmentedKeys(cacheKey,
> > storeKey)* and watching the two input values as I looped through the
> > iterator using "*while(iterator.hasNext()) { print(iterator.next()) }*".
> In
> > one test, the duplicate value was 66666, and saw the following behavior
> > (trimming off the segment values from the byte input):
> > -- compareSegmentedKeys(cacheKey = 66666, storeKey = 22222)
> > -- next() returns 66666
> > and
> > -- compareSegmentedKeys(cacheKey = 77777, storeKey = 66666)
> > -- next() returns 66666
> > Besides those, the input values are the same and the output is as
> expected.
> > Additionally, a coworker noted that the number of duplicates always
> matches
> > the number of times *Long.compare(cacheSegmentId, storeSegmentId)
> *returns
> > a non-zero value, indicating that duplicates are likely arising due to
> the
> > segment comparison.
> >
>
>
>
> --
> -- Guozhang
>

Reply via email to