[ 
https://issues.apache.org/jira/browse/KAFKA-9921?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Georgi Petkov updated KAFKA-9921:
---------------------------------
    Description: 
I'm using the current latest version 2.5.0 but this is not something new.

I have _WindowStateStore_ configured as following (where _true_ stands for the 
_retainDuplicates_ paramter):
 _builder.addStateStore(windowStoreBuilder(persistentWindowStore(name, 
retentionPeriod, windowSize, *true*), keySerde, 
valueSerde)*.withCachingEnabled()*)_

If I put 4 key-value pairs with the same key and values *1, 2, 3, 4* in that 
order when reading them through the iterator I'll get the values *4, 2, 3, 4*.
 I've done a bit of investigation myself and the problem is that *the whole 
caching feature is written without consideration of the case where duplicates 
are retained*.

The observed behavior is due to having the last value in the cache (and it can 
have only one since it's not aware of the retain duplicates option) and it is 
read first (while skipping the first from the RocksDB iterator even though the 
values are different). This can be observed (for version 2.5.0) in 
_AbstractMergedSortedCacheStoreIterator#next()_ lines 95-97. Then the next 3 
values are read from the RocksDB iterator so they are as expected.

As I said, the whole feature is not considering the _retainDuplicates_ option 
so there are other examples of incorrect behavior like in 
_AbstractMergedSortedCacheStoreIterator__#peekNextKey()_ - for each call, you 
would skip one duplicate entry in the RocksDB iterator for the given key.

In my use case, I want to persist a list of values for a given key without 
increasing the complexity to linear for a single event (which would be the case 
if I was always reading the current list appending one value and writing it 
back). So I go for _List<KeyValuePair<K, V>>_ instead of _KeyValuePair<K, 
List<V>>_. The whole use case is more complex than that so I use 
_#transformValues_ and state stores.

So as an impact I'm currently not able to use caching on my state stores.

  was:
I'm using the current latest version 2.5.0 but this is not something new.

I have _WindowStateStore_ configured as following (where _true_ stands for the 
_retainDuplicates_ paramter):
 _builder.addStateStore(windowStoreBuilder(persistentWindowStore(name, 
retentionPeriod, windowSize, *true*), keySerde, 
valueSerde)*.withCachingEnabled()*)_

If I put 4 key-value pairs with the same key and values *1, 2, 3, 4* in that 
order when reading them through the iterator I'll get the values *4, 2, 3, 4*.
 I've done a bit of investigation myself and the problem is that *the whole 
caching feature is written without consideration of the case where duplicates 
are retained*.

The observed behavior is due to having the last value in the cache (and it can 
have only one since it's not aware of the retain duplicates option) and it is 
read first (while skipping the first from the RocksDB iterator even though the 
values are different). This can be observed (for version 2.5.0) in 
_AbstractMergedSortedCacheStoreIterator#next()_ lines 95-97. Then we read the 
next 3 values from the RocksDB iterator and they are as expected.

As I said, the whole feature is not considering the _retainDuplicates_ option 
so there are other examples of incorrect behavior like in 
_AbstractMergedSortedCacheStoreIterator__#peekNextKey()_ - for each call, you 
would skip one duplicate entry in the RocksDB iterator for the given key.

In my use case, I want to persist a list of values for a given key without 
increasing the complexity to linear for a single event (which would be the case 
if I was always reading the current list appending one value and writing it 
back). So I go for _List<KeyValuePair<K, V>>_ instead of _KeyValuePair<K, 
List<V>>_. The whole use case is more complex than that so I use 
_#transformValues_ and state stores.

So as an impact I'm currently not able to use caching on my state stores.


> Caching is not working properly with WindowStateStore when rataining 
> duplicates
> -------------------------------------------------------------------------------
>
>                 Key: KAFKA-9921
>                 URL: https://issues.apache.org/jira/browse/KAFKA-9921
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 2.5.0
>            Reporter: Georgi Petkov
>            Priority: Major
>
> I'm using the current latest version 2.5.0 but this is not something new.
> I have _WindowStateStore_ configured as following (where _true_ stands for 
> the _retainDuplicates_ paramter):
>  _builder.addStateStore(windowStoreBuilder(persistentWindowStore(name, 
> retentionPeriod, windowSize, *true*), keySerde, 
> valueSerde)*.withCachingEnabled()*)_
> If I put 4 key-value pairs with the same key and values *1, 2, 3, 4* in that 
> order when reading them through the iterator I'll get the values *4, 2, 3, 4*.
>  I've done a bit of investigation myself and the problem is that *the whole 
> caching feature is written without consideration of the case where duplicates 
> are retained*.
> The observed behavior is due to having the last value in the cache (and it 
> can have only one since it's not aware of the retain duplicates option) and 
> it is read first (while skipping the first from the RocksDB iterator even 
> though the values are different). This can be observed (for version 2.5.0) in 
> _AbstractMergedSortedCacheStoreIterator#next()_ lines 95-97. Then the next 3 
> values are read from the RocksDB iterator so they are as expected.
> As I said, the whole feature is not considering the _retainDuplicates_ option 
> so there are other examples of incorrect behavior like in 
> _AbstractMergedSortedCacheStoreIterator__#peekNextKey()_ - for each call, you 
> would skip one duplicate entry in the RocksDB iterator for the given key.
> In my use case, I want to persist a list of values for a given key without 
> increasing the complexity to linear for a single event (which would be the 
> case if I was always reading the current list appending one value and writing 
> it back). So I go for _List<KeyValuePair<K, V>>_ instead of _KeyValuePair<K, 
> List<V>>_. The whole use case is more complex than that so I use 
> _#transformValues_ and state stores.
> So as an impact I'm currently not able to use caching on my state stores.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to