Hi all,

I'll first describe a simplified view of relevant parts of our setup (which
should be enough to repro), describe the behavior we're seeing, and then
note some information I've come across after digging in a bit.

We have a kafka stream application, and one of our transform steps keeps a
state store to filter out messages with a previously seen GUID. That is,
our transform looks like:

public KeyValue<byte[], String> transform(byte[] key, String guid) {
    try (WindowStoreIterator<DuplicateMessageMetadata> iterator =
duplicateStore.fetch(correlationId, start, now)) {
        if (iterator.hasNext()) {
            return null;
        } else {
            duplicateStore.put(correlationId, some metadata);
            return new KeyValue<>(key, message);
        }
    }}

where the duplicateStore is a persistent windowed store with caching
enabled.

I was debugging some tests and found that sometimes when calling
*all()* or *fetchAll()
*on the duplicate store and stepping through the iterator, it would return
the same guid more than once, even if it was only inserted into the store
once. More specifically, if I had the following guids sent to the stream:
[11111, 22222, ... 99999] (for 9 values total), sometimes it would return
10 values, with one (or more) of the values being returned twice by the
iterator. However, this would not show up with a *fetch(guid)* on that
specific guid. For instance, if 11111 was being returned twice by
*fetchAll()*, calling *duplicateStore.fetch("11111", start, end)* will
still return an iterator with size of 1.

I dug into this a bit more by setting a breakpoint in
*SegmentedCacheFunction#compareSegmentedKeys(cacheKey,
storeKey)* and watching the two input values as I looped through the
iterator using "*while(iterator.hasNext()) { print(iterator.next()) }*". In
one test, the duplicate value was 66666, and saw the following behavior
(trimming off the segment values from the byte input):
-- compareSegmentedKeys(cacheKey = 66666, storeKey = 22222)
-- next() returns 66666
and
-- compareSegmentedKeys(cacheKey = 77777, storeKey = 66666)
-- next() returns 66666
Besides those, the input values are the same and the output is as expected.
Additionally, a coworker noted that the number of duplicates always matches
the number of times *Long.compare(cacheSegmentId, storeSegmentId) *returns
a non-zero value, indicating that duplicates are likely arising due to the
segment comparison.

Reply via email to