[ 
https://issues.apache.org/jira/browse/KAFKA-9921?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17102168#comment-17102168
 ] 

Sophie Blee-Goldman commented on KAFKA-9921:
--------------------------------------------

No need to explain, I actually agree with you in general when it comes to 
returning early vs if/else. But the general consensus seems to be against 
returning early, speaking from experience (I've been "asked" to switch to 
if/else in Streams code review before ;)) Anyways the current code base follows 
the if/else pattern, and consistency is more important than anything particular 
style issue imho. 

I'm a bit hesitant to add this to the WindowStore#put API, because technically 
the WindowStore is and should be completely agnostic to the presence of 
duplicates. Put just guarantees that, for the given key, it will insert the 
value or delete that key's entry if null. The "trick" is that this key gets 
wrapped up with a unique id in the case of retainDuplicates, so the final #put 
call is not on the same key. In the case of retainDuplicates and null values we 
know this will be a no-op so we can just skip the delete, but it's really more 
of an optimization than a specific or special behavior.

It's not very (or at all) straightforward, and ideally we would not even have 
this option but a separate store implementation entirely for the duplicates 
case. That's a much bigger scope of work, but maybe there are some small steps 
in the right direction we can take now. For example, instead of just hacking 
the duplicates in as a boolean switch, we could split that out into another 
store wrapper just like the cache, logging, metrics, etc. Then the 
DuplicatesWindowStore (any better name suggestions?) could do the key wrapping 
before delegating to the underlying store, which can then remove the hacky 
`retainDuplicates` flag. That seems like it would make the duplicate handling 
logic more explicit, and we can describe it further in the javadocs if 
necessary. WDYT?

Feel free to comment on the PR directly btw. Thanks for the feedback!

> Caching is not working properly with WindowStateStore when retaining 
> duplicates
> -------------------------------------------------------------------------------
>
>                 Key: KAFKA-9921
>                 URL: https://issues.apache.org/jira/browse/KAFKA-9921
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 2.5.0
>            Reporter: Georgi Petkov
>            Assignee: Sophie Blee-Goldman
>            Priority: Major
>             Fix For: 2.6.0, 2.5.1
>
>
> I'm using the current latest version 2.5.0 but this is not something new.
> I have _WindowStateStore_ configured as following (where _true_ stands for 
> the _retainDuplicates_ paramter):
>  _builder.addStateStore(windowStoreBuilder(persistentWindowStore(name, 
> retentionPeriod, windowSize, *true*), keySerde, 
> valueSerde)*.withCachingEnabled()*)_
> If I put 4 key-value pairs with the same key and values *1, 2, 3, 4* in that 
> order when reading them through the iterator I'll get the values *4, 2, 3, 4*.
>  I've done a bit of investigation myself and the problem is that *the whole 
> caching feature is written without consideration of the case where duplicates 
> are retained*.
> The observed behavior is due to having the last value in the cache (and it 
> can have only one since it's not aware of the retain duplicates option) and 
> it is read first (while skipping the first from the RocksDB iterator even 
> though the values are different). This can be observed (for version 2.5.0) in 
> _AbstractMergedSortedCacheStoreIterator#next()_ lines 95-97. Then the next 3 
> values are read from the RocksDB iterator so they are as expected.
> As I said, the whole feature is not considering the _retainDuplicates_ option 
> so there are other examples of incorrect behavior like in 
> _AbstractMergedSortedCacheStoreIterator__#peekNextKey()_ - for each call, you 
> would skip one duplicate entry in the RocksDB iterator for the given key.
> In my use case, I want to persist a list of values for a given key without 
> increasing the complexity to linear for a single event (which would be the 
> case if I was always reading the current list appending one value and writing 
> it back). So I go for _List<KeyValuePair<K, V>>_ instead of _KeyValuePair<K, 
> List<V>>_. The whole use case is more complex than that so I use 
> _#transformValues_ and state stores.
> So as an impact I can't use caching on my state stores. For others - they'll 
> have incorrect behavior that may take a lot of time to be discovered and even 
> more time to fix the results.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to