Hello Christian,

Since you are calling fetch(key, start, end) I'm assuming that duplicateStore
is a WindowedStore. With a windowed store, it is possible that a single key
can fall into multiple windows, and hence be returned from the
WindowStoreIterator,
note its type is <Windowed<K>, V>

So I'd first want to know

1) which Kafka version are you using.
2) why you'd need a window store, and if yes, could you consider using the
single point fetch (added in KAFKA-6560) other than the range query (which
is more expensive as well).



Guozhang


On Fri, Jun 29, 2018 at 11:38 AM, Christian Henry <
christian.henr...@gmail.com> wrote:

> Hi all,
>
> I'll first describe a simplified view of relevant parts of our setup (which
> should be enough to repro), describe the behavior we're seeing, and then
> note some information I've come across after digging in a bit.
>
> We have a kafka stream application, and one of our transform steps keeps a
> state store to filter out messages with a previously seen GUID. That is,
> our transform looks like:
>
> public KeyValue<byte[], String> transform(byte[] key, String guid) {
>     try (WindowStoreIterator<DuplicateMessageMetadata> iterator =
> duplicateStore.fetch(correlationId, start, now)) {
>         if (iterator.hasNext()) {
>             return null;
>         } else {
>             duplicateStore.put(correlationId, some metadata);
>             return new KeyValue<>(key, message);
>         }
>     }}
>
> where the duplicateStore is a persistent windowed store with caching
> enabled.
>
> I was debugging some tests and found that sometimes when calling
> *all()* or *fetchAll()
> *on the duplicate store and stepping through the iterator, it would return
> the same guid more than once, even if it was only inserted into the store
> once. More specifically, if I had the following guids sent to the stream:
> [11111, 22222, ... 99999] (for 9 values total), sometimes it would return
> 10 values, with one (or more) of the values being returned twice by the
> iterator. However, this would not show up with a *fetch(guid)* on that
> specific guid. For instance, if 11111 was being returned twice by
> *fetchAll()*, calling *duplicateStore.fetch("11111", start, end)* will
> still return an iterator with size of 1.
>
> I dug into this a bit more by setting a breakpoint in
> *SegmentedCacheFunction#compareSegmentedKeys(cacheKey,
> storeKey)* and watching the two input values as I looped through the
> iterator using "*while(iterator.hasNext()) { print(iterator.next()) }*". In
> one test, the duplicate value was 66666, and saw the following behavior
> (trimming off the segment values from the byte input):
> -- compareSegmentedKeys(cacheKey = 66666, storeKey = 22222)
> -- next() returns 66666
> and
> -- compareSegmentedKeys(cacheKey = 77777, storeKey = 66666)
> -- next() returns 66666
> Besides those, the input values are the same and the output is as expected.
> Additionally, a coworker noted that the number of duplicates always matches
> the number of times *Long.compare(cacheSegmentId, storeSegmentId) *returns
> a non-zero value, indicating that duplicates are likely arising due to the
> segment comparison.
>



-- 
-- Guozhang

Reply via email to