[ 
https://issues.apache.org/jira/browse/KAFKA-9921?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17101215#comment-17101215
 ] 

Georgi Petkov commented on KAFKA-9921:
--------------------------------------

Maybe we can add the same information on the WindowStore#put method as well.

It's a personal style preference but if there are short circuit checks in which 
cases you have trivial or no implementation I would put it at the beginning of 
the method instead of adding more branching in the rest of the logic. So I 
would write:
{code:java}
if (some corner case) {
    doSomething();
    return;
}{code}
instead of:
{code:java}
if (some corner case) {
    doSomething();
} else {
    // nested code
    if (...) {
        ...
    } else {
        ...
    }
}{code}
It's kind of hard to explain. See 
[this|https://softwareengineering.stackexchange.com/questions/18454/should-i-return-from-a-function-early-or-use-an-if-statement]
 question and InMemoryWindowStore#put.

I haven't checked but I would guess that the behavior with null values and 
retainDuplicates has no explicit tests. If that it the case you could add some.

> Caching is not working properly with WindowStateStore when retaining 
> duplicates
> -------------------------------------------------------------------------------
>
>                 Key: KAFKA-9921
>                 URL: https://issues.apache.org/jira/browse/KAFKA-9921
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 2.5.0
>            Reporter: Georgi Petkov
>            Assignee: Sophie Blee-Goldman
>            Priority: Major
>             Fix For: 2.6.0, 2.5.1
>
>
> I'm using the current latest version 2.5.0 but this is not something new.
> I have _WindowStateStore_ configured as following (where _true_ stands for 
> the _retainDuplicates_ paramter):
>  _builder.addStateStore(windowStoreBuilder(persistentWindowStore(name, 
> retentionPeriod, windowSize, *true*), keySerde, 
> valueSerde)*.withCachingEnabled()*)_
> If I put 4 key-value pairs with the same key and values *1, 2, 3, 4* in that 
> order when reading them through the iterator I'll get the values *4, 2, 3, 4*.
>  I've done a bit of investigation myself and the problem is that *the whole 
> caching feature is written without consideration of the case where duplicates 
> are retained*.
> The observed behavior is due to having the last value in the cache (and it 
> can have only one since it's not aware of the retain duplicates option) and 
> it is read first (while skipping the first from the RocksDB iterator even 
> though the values are different). This can be observed (for version 2.5.0) in 
> _AbstractMergedSortedCacheStoreIterator#next()_ lines 95-97. Then the next 3 
> values are read from the RocksDB iterator so they are as expected.
> As I said, the whole feature is not considering the _retainDuplicates_ option 
> so there are other examples of incorrect behavior like in 
> _AbstractMergedSortedCacheStoreIterator__#peekNextKey()_ - for each call, you 
> would skip one duplicate entry in the RocksDB iterator for the given key.
> In my use case, I want to persist a list of values for a given key without 
> increasing the complexity to linear for a single event (which would be the 
> case if I was always reading the current list appending one value and writing 
> it back). So I go for _List<KeyValuePair<K, V>>_ instead of _KeyValuePair<K, 
> List<V>>_. The whole use case is more complex than that so I use 
> _#transformValues_ and state stores.
> So as an impact I can't use caching on my state stores. For others - they'll 
> have incorrect behavior that may take a lot of time to be discovered and even 
> more time to fix the results.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to