[jira] [Commented] (KAFKA-9921) Caching is not working properly with WindowStateStore when retaining duplicates

2020-05-08 Thread Georgi Petkov (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9921?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17102974#comment-17102974
 ] 

Georgi Petkov commented on KAFKA-9921:
--

Oh yeah, my bad.

Well, you can definitely start without new interfaces for the time being 
(preserving the current behavior even for the case with _WindowStore#fetch(key, 
time)_). Then you won't need different return types and therefore different 
methods in _Stores_ - those can be added later (potentially when more 
information or opinions are present).

> Caching is not working properly with WindowStateStore when retaining 
> duplicates
> ---
>
> Key: KAFKA-9921
> URL: https://issues.apache.org/jira/browse/KAFKA-9921
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.5.0
>Reporter: Georgi Petkov
>Assignee: Sophie Blee-Goldman
>Priority: Major
> Fix For: 2.6.0, 2.5.1
>
>
> I'm using the current latest version 2.5.0 but this is not something new.
> I have _WindowStateStore_ configured as following (where _true_ stands for 
> the _retainDuplicates_ paramter):
>  _builder.addStateStore(windowStoreBuilder(persistentWindowStore(name, 
> retentionPeriod, windowSize, *true*), keySerde, 
> valueSerde)*.withCachingEnabled()*)_
> If I put 4 key-value pairs with the same key and values *1, 2, 3, 4* in that 
> order when reading them through the iterator I'll get the values *4, 2, 3, 4*.
>  I've done a bit of investigation myself and the problem is that *the whole 
> caching feature is written without consideration of the case where duplicates 
> are retained*.
> The observed behavior is due to having the last value in the cache (and it 
> can have only one since it's not aware of the retain duplicates option) and 
> it is read first (while skipping the first from the RocksDB iterator even 
> though the values are different). This can be observed (for version 2.5.0) in 
> _AbstractMergedSortedCacheStoreIterator#next()_ lines 95-97. Then the next 3 
> values are read from the RocksDB iterator so they are as expected.
> As I said, the whole feature is not considering the _retainDuplicates_ option 
> so there are other examples of incorrect behavior like in 
> _AbstractMergedSortedCacheStoreIterator__#peekNextKey()_ - for each call, you 
> would skip one duplicate entry in the RocksDB iterator for the given key.
> In my use case, I want to persist a list of values for a given key without 
> increasing the complexity to linear for a single event (which would be the 
> case if I was always reading the current list appending one value and writing 
> it back). So I go for _List>_ instead of _KeyValuePair List>_. The whole use case is more complex than that so I use 
> _#transformValues_ and state stores.
> So as an impact I can't use caching on my state stores. For others - they'll 
> have incorrect behavior that may take a lot of time to be discovered and even 
> more time to fix the results.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9921) Caching is not working properly with WindowStateStore when retaining duplicates

2020-05-08 Thread Georgi Petkov (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9921?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17102959#comment-17102959
 ] 

Georgi Petkov commented on KAFKA-9921:
--

+1 for the new methods in Stores.

Currently, fetches are only for a range and since the range can contain more 
than one window it can return more than one value even when not retaining 
duplicates. I don't see any fetch by a key and a single timestamp or do you 
mean to have one in the future?

> Caching is not working properly with WindowStateStore when retaining 
> duplicates
> ---
>
> Key: KAFKA-9921
> URL: https://issues.apache.org/jira/browse/KAFKA-9921
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.5.0
>Reporter: Georgi Petkov
>Assignee: Sophie Blee-Goldman
>Priority: Major
> Fix For: 2.6.0, 2.5.1
>
>
> I'm using the current latest version 2.5.0 but this is not something new.
> I have _WindowStateStore_ configured as following (where _true_ stands for 
> the _retainDuplicates_ paramter):
>  _builder.addStateStore(windowStoreBuilder(persistentWindowStore(name, 
> retentionPeriod, windowSize, *true*), keySerde, 
> valueSerde)*.withCachingEnabled()*)_
> If I put 4 key-value pairs with the same key and values *1, 2, 3, 4* in that 
> order when reading them through the iterator I'll get the values *4, 2, 3, 4*.
>  I've done a bit of investigation myself and the problem is that *the whole 
> caching feature is written without consideration of the case where duplicates 
> are retained*.
> The observed behavior is due to having the last value in the cache (and it 
> can have only one since it's not aware of the retain duplicates option) and 
> it is read first (while skipping the first from the RocksDB iterator even 
> though the values are different). This can be observed (for version 2.5.0) in 
> _AbstractMergedSortedCacheStoreIterator#next()_ lines 95-97. Then the next 3 
> values are read from the RocksDB iterator so they are as expected.
> As I said, the whole feature is not considering the _retainDuplicates_ option 
> so there are other examples of incorrect behavior like in 
> _AbstractMergedSortedCacheStoreIterator__#peekNextKey()_ - for each call, you 
> would skip one duplicate entry in the RocksDB iterator for the given key.
> In my use case, I want to persist a list of values for a given key without 
> increasing the complexity to linear for a single event (which would be the 
> case if I was always reading the current list appending one value and writing 
> it back). So I go for _List>_ instead of _KeyValuePair List>_. The whole use case is more complex than that so I use 
> _#transformValues_ and state stores.
> So as an impact I can't use caching on my state stores. For others - they'll 
> have incorrect behavior that may take a lot of time to be discovered and even 
> more time to fix the results.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9921) Caching is not working properly with WindowStateStore when retaining duplicates

2020-05-08 Thread Georgi Petkov (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9921?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17102320#comment-17102320
 ] 

Georgi Petkov commented on KAFKA-9921:
--

I will do further comments regarding the code in the PR.

Indeed usually boolean flags are a code smell indicating that perhaps another 
approach should be taken. The idea of adding another decorator sounds good. 
Still, things will be more explicit in the code but not in the API. Therefore 
your main concern regarding the fact that #put ??should be completely agnostic 
to the presence of duplicates?? still remains. IMO it's better to mention it 
there than not. Since the library is providing it as a feature I guess it's not 
that much of an implementation detail than clarifying behavior. I can see how a 
user of the API may be confused that putting null will remove all entries with 
that key. If it's not written, personally I would either do a google search, 
check the implementation or test it and then add a test in my own code as a 
regression test since when not documented and guaranteed by the library this 
could change and I depend on it.

Regarding the name - _DuplicatesWindowStore_ sounds more like it's storing only 
the duplicates or at least can be confusing. I would suggest 
_MultiValueWindowStore_ or something similar as it appears to be the general 
term for collections like Mutilset and Multimap.

> Caching is not working properly with WindowStateStore when retaining 
> duplicates
> ---
>
> Key: KAFKA-9921
> URL: https://issues.apache.org/jira/browse/KAFKA-9921
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.5.0
>Reporter: Georgi Petkov
>Assignee: Sophie Blee-Goldman
>Priority: Major
> Fix For: 2.6.0, 2.5.1
>
>
> I'm using the current latest version 2.5.0 but this is not something new.
> I have _WindowStateStore_ configured as following (where _true_ stands for 
> the _retainDuplicates_ paramter):
>  _builder.addStateStore(windowStoreBuilder(persistentWindowStore(name, 
> retentionPeriod, windowSize, *true*), keySerde, 
> valueSerde)*.withCachingEnabled()*)_
> If I put 4 key-value pairs with the same key and values *1, 2, 3, 4* in that 
> order when reading them through the iterator I'll get the values *4, 2, 3, 4*.
>  I've done a bit of investigation myself and the problem is that *the whole 
> caching feature is written without consideration of the case where duplicates 
> are retained*.
> The observed behavior is due to having the last value in the cache (and it 
> can have only one since it's not aware of the retain duplicates option) and 
> it is read first (while skipping the first from the RocksDB iterator even 
> though the values are different). This can be observed (for version 2.5.0) in 
> _AbstractMergedSortedCacheStoreIterator#next()_ lines 95-97. Then the next 3 
> values are read from the RocksDB iterator so they are as expected.
> As I said, the whole feature is not considering the _retainDuplicates_ option 
> so there are other examples of incorrect behavior like in 
> _AbstractMergedSortedCacheStoreIterator__#peekNextKey()_ - for each call, you 
> would skip one duplicate entry in the RocksDB iterator for the given key.
> In my use case, I want to persist a list of values for a given key without 
> increasing the complexity to linear for a single event (which would be the 
> case if I was always reading the current list appending one value and writing 
> it back). So I go for _List>_ instead of _KeyValuePair List>_. The whole use case is more complex than that so I use 
> _#transformValues_ and state stores.
> So as an impact I can't use caching on my state stores. For others - they'll 
> have incorrect behavior that may take a lot of time to be discovered and even 
> more time to fix the results.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9921) Caching is not working properly with WindowStateStore when retaining duplicates

2020-05-06 Thread Georgi Petkov (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9921?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17101215#comment-17101215
 ] 

Georgi Petkov commented on KAFKA-9921:
--

Maybe we can add the same information on the WindowStore#put method as well.

It's a personal style preference but if there are short circuit checks in which 
cases you have trivial or no implementation I would put it at the beginning of 
the method instead of adding more branching in the rest of the logic. So I 
would write:
{code:java}
if (some corner case) {
doSomething();
return;
}{code}
instead of:
{code:java}
if (some corner case) {
doSomething();
} else {
// nested code
if (...) {
...
} else {
...
}
}{code}
It's kind of hard to explain. See 
[this|https://softwareengineering.stackexchange.com/questions/18454/should-i-return-from-a-function-early-or-use-an-if-statement]
 question and InMemoryWindowStore#put.

I haven't checked but I would guess that the behavior with null values and 
retainDuplicates has no explicit tests. If that it the case you could add some.

> Caching is not working properly with WindowStateStore when retaining 
> duplicates
> ---
>
> Key: KAFKA-9921
> URL: https://issues.apache.org/jira/browse/KAFKA-9921
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.5.0
>Reporter: Georgi Petkov
>Assignee: Sophie Blee-Goldman
>Priority: Major
> Fix For: 2.6.0, 2.5.1
>
>
> I'm using the current latest version 2.5.0 but this is not something new.
> I have _WindowStateStore_ configured as following (where _true_ stands for 
> the _retainDuplicates_ paramter):
>  _builder.addStateStore(windowStoreBuilder(persistentWindowStore(name, 
> retentionPeriod, windowSize, *true*), keySerde, 
> valueSerde)*.withCachingEnabled()*)_
> If I put 4 key-value pairs with the same key and values *1, 2, 3, 4* in that 
> order when reading them through the iterator I'll get the values *4, 2, 3, 4*.
>  I've done a bit of investigation myself and the problem is that *the whole 
> caching feature is written without consideration of the case where duplicates 
> are retained*.
> The observed behavior is due to having the last value in the cache (and it 
> can have only one since it's not aware of the retain duplicates option) and 
> it is read first (while skipping the first from the RocksDB iterator even 
> though the values are different). This can be observed (for version 2.5.0) in 
> _AbstractMergedSortedCacheStoreIterator#next()_ lines 95-97. Then the next 3 
> values are read from the RocksDB iterator so they are as expected.
> As I said, the whole feature is not considering the _retainDuplicates_ option 
> so there are other examples of incorrect behavior like in 
> _AbstractMergedSortedCacheStoreIterator__#peekNextKey()_ - for each call, you 
> would skip one duplicate entry in the RocksDB iterator for the given key.
> In my use case, I want to persist a list of values for a given key without 
> increasing the complexity to linear for a single event (which would be the 
> case if I was always reading the current list appending one value and writing 
> it back). So I go for _List>_ instead of _KeyValuePair List>_. The whole use case is more complex than that so I use 
> _#transformValues_ and state stores.
> So as an impact I can't use caching on my state stores. For others - they'll 
> have incorrect behavior that may take a lot of time to be discovered and even 
> more time to fix the results.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9921) Caching is not working properly with WindowStateStore when retaining duplicates

2020-05-06 Thread Georgi Petkov (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9921?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=1710#comment-1710
 ] 

Georgi Petkov commented on KAFKA-9921:
--

[~ableegoldman] Did you get to read my last comment? At least the bolded text?

> Caching is not working properly with WindowStateStore when retaining 
> duplicates
> ---
>
> Key: KAFKA-9921
> URL: https://issues.apache.org/jira/browse/KAFKA-9921
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.5.0
>Reporter: Georgi Petkov
>Assignee: Sophie Blee-Goldman
>Priority: Major
> Fix For: 2.6.0, 2.5.1
>
>
> I'm using the current latest version 2.5.0 but this is not something new.
> I have _WindowStateStore_ configured as following (where _true_ stands for 
> the _retainDuplicates_ paramter):
>  _builder.addStateStore(windowStoreBuilder(persistentWindowStore(name, 
> retentionPeriod, windowSize, *true*), keySerde, 
> valueSerde)*.withCachingEnabled()*)_
> If I put 4 key-value pairs with the same key and values *1, 2, 3, 4* in that 
> order when reading them through the iterator I'll get the values *4, 2, 3, 4*.
>  I've done a bit of investigation myself and the problem is that *the whole 
> caching feature is written without consideration of the case where duplicates 
> are retained*.
> The observed behavior is due to having the last value in the cache (and it 
> can have only one since it's not aware of the retain duplicates option) and 
> it is read first (while skipping the first from the RocksDB iterator even 
> though the values are different). This can be observed (for version 2.5.0) in 
> _AbstractMergedSortedCacheStoreIterator#next()_ lines 95-97. Then the next 3 
> values are read from the RocksDB iterator so they are as expected.
> As I said, the whole feature is not considering the _retainDuplicates_ option 
> so there are other examples of incorrect behavior like in 
> _AbstractMergedSortedCacheStoreIterator__#peekNextKey()_ - for each call, you 
> would skip one duplicate entry in the RocksDB iterator for the given key.
> In my use case, I want to persist a list of values for a given key without 
> increasing the complexity to linear for a single event (which would be the 
> case if I was always reading the current list appending one value and writing 
> it back). So I go for _List>_ instead of _KeyValuePair List>_. The whole use case is more complex than that so I use 
> _#transformValues_ and state stores.
> So as an impact I can't use caching on my state stores. For others - they'll 
> have incorrect behavior that may take a lot of time to be discovered and even 
> more time to fix the results.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-9921) Caching is not working properly with WindowStateStore when retaining duplicates

2020-04-28 Thread Georgi Petkov (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9921?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Georgi Petkov updated KAFKA-9921:
-
Summary: Caching is not working properly with WindowStateStore when 
retaining duplicates  (was: Caching is not working properly with 
WindowStateStore when rataining duplicates)

> Caching is not working properly with WindowStateStore when retaining 
> duplicates
> ---
>
> Key: KAFKA-9921
> URL: https://issues.apache.org/jira/browse/KAFKA-9921
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.5.0
>Reporter: Georgi Petkov
>Assignee: Sophie Blee-Goldman
>Priority: Major
> Fix For: 2.6.0, 2.5.1
>
>
> I'm using the current latest version 2.5.0 but this is not something new.
> I have _WindowStateStore_ configured as following (where _true_ stands for 
> the _retainDuplicates_ paramter):
>  _builder.addStateStore(windowStoreBuilder(persistentWindowStore(name, 
> retentionPeriod, windowSize, *true*), keySerde, 
> valueSerde)*.withCachingEnabled()*)_
> If I put 4 key-value pairs with the same key and values *1, 2, 3, 4* in that 
> order when reading them through the iterator I'll get the values *4, 2, 3, 4*.
>  I've done a bit of investigation myself and the problem is that *the whole 
> caching feature is written without consideration of the case where duplicates 
> are retained*.
> The observed behavior is due to having the last value in the cache (and it 
> can have only one since it's not aware of the retain duplicates option) and 
> it is read first (while skipping the first from the RocksDB iterator even 
> though the values are different). This can be observed (for version 2.5.0) in 
> _AbstractMergedSortedCacheStoreIterator#next()_ lines 95-97. Then the next 3 
> values are read from the RocksDB iterator so they are as expected.
> As I said, the whole feature is not considering the _retainDuplicates_ option 
> so there are other examples of incorrect behavior like in 
> _AbstractMergedSortedCacheStoreIterator__#peekNextKey()_ - for each call, you 
> would skip one duplicate entry in the RocksDB iterator for the given key.
> In my use case, I want to persist a list of values for a given key without 
> increasing the complexity to linear for a single event (which would be the 
> case if I was always reading the current list appending one value and writing 
> it back). So I go for _List>_ instead of _KeyValuePair List>_. The whole use case is more complex than that so I use 
> _#transformValues_ and state stores.
> So as an impact I can't use caching on my state stores. For others - they'll 
> have incorrect behavior that may take a lot of time to be discovered and even 
> more time to fix the results.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (KAFKA-9921) Caching is not working properly with WindowStateStore when rataining duplicates

2020-04-28 Thread Georgi Petkov (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9921?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17094900#comment-17094900
 ] 

Georgi Petkov edited comment on KAFKA-9921 at 4/28/20, 10:23 PM:
-

[~ableegoldman]

Yeah, I agree that probably not much can be done in terms of caching (compared 
to the options without _retainDuplicates_).

I totally agree that many of the features like the null value behavior are 
correct and make perfect sense from point of view of the features implemented 
with it. Still, it's strange from the perspective where you use it standalone. 
*1-2 sentences clarifying the behavior with null values in the 
_WindowStateStore_ documentation could definitely help.* In addition, as I said 
if this is the desired behavior *you can easily skip calling RocksDB for null 
values (when using _retainDuplicates)_. This would both make the intention 
clearer and obviously avoid unnecessary calls.*

I do need exactly stream-stream join but without the repartition part. I want 
to get matches when there are new events in whichever stream, support duplicate 
keys in the stream and I also use _WindowStateStore_ only for the retention 
policy. In fact, due to the lack of many examples, I was looking at the 
stream-stream join implementation to find out how to correctly use the 
_WindowStateStores_. I'm building a library for some common yet not trivial at 
all operations on streams that you may need like topological sorting. Therefore 
I don't know if the user will provide null values or not. I was curious about 
the behavior with null values so I know what I'm providing to the user. I've 
tested it and that's how I found out what is the exact behavior.

*I'm not sure that an in-memory or any custom state store will make it.* Yes, 
in-memory will help with the efficient append because it avoids any expensive 
call and serializations/deserializations. Nevertheless, *you will always have 
the serializations/deserializations somewhere and this is the changelog topic 
and there you have also bandwidth* (not just precious processing time). Even if 
the list is fixed to let's say only 5 items you will still have 15 (1 + 2 + 3 + 
4 + 5) events recorded instead of 5. Obviously the size grows pretty fast - 
O(n^2). Combined with the fact that I want to provide a library to many 
different users (and duplicates count may vary a lot between usages) *to me 
it's best to implement just as in the stream-stream join - with duplicates*. 
Still, it was a great discussion and made me more confident in my decisions. 
Thank you for your assistance.

*Regarding the PR - it adds the same code to both _WindowStoreBuilder.java_ and 
_TimestampedWindowStoreBuilder.java_ but adds a test for only one of them.*


was (Author: georgi.petkov):
[~ableegoldman]

Yeah, I agree that probably not much can be done in terms of caching (compared 
to the options without _retainDuplicates_).

I totally agree that many of the features like the null value behavior are 
correct and make perfect sense from point of view of the features implemented 
with it. Still, it's strange from the perspective where you use it standalone. 
*1-2 sentences clarifying the behavior with null values in the 
_WindowStateStore_ documentation could definitely help.* In addition, as I said 
if this is the desired behavior *you can easily skip calling RocksDB for null 
values (when using _retainDuplicates)_. This would make both the intention 
clearer and obviously avoid unnecessary calls.*

I do need exactly stream-stream join but without the repartition part. I want 
to get matches when there are new events in whichever stream and I also use 
_WindowStateStore_ only for the retention policy. In fact, due to the lack of 
many examples, I was looking at the stream-stream join implementation to find 
out how to correctly use the _WindowStateStores_. I'm building a library for 
some common yet not trivial at all operations on streams that you may need like 
topological sorting. Therefore I don't know if the user will provide null 
values or not. I was curious about the behavior with null values so I know what 
I'm providing to the user. I've tested it and that's how I found out what is 
the exact behavior.

*I'm not sure that an in-memory or any custom state store will make it.* Yes, 
in-memory will help with the efficient append because it avoids any expensive 
call and serializations/deserializations. Nevertheless, *you will always have 
the serializations/deserializations somewhere and this is the changelog topic 
and there you have also bandwidth* (not just precious processing time). Even if 
the list is fixed to let's say only 5 items you will still have 15 (1 + 2 + 3 + 
4 + 5) events recorded instead of 5. Obviously the size grows pretty fast - 
O(n^2). Combined with the fact that I want to provide a library to many 
different users (and duplicates count may vary a lot 

[jira] [Commented] (KAFKA-9921) Caching is not working properly with WindowStateStore when rataining duplicates

2020-04-28 Thread Georgi Petkov (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9921?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17094900#comment-17094900
 ] 

Georgi Petkov commented on KAFKA-9921:
--

[~ableegoldman]

Yeah, I agree that probably not much can be done in terms of caching (compared 
to the options without _retainDuplicates_).

I totally agree that many of the features like the null value behavior are 
correct and make perfect sense from point of view of the features implemented 
with it. Still, it's strange from the perspective where you use it standalone. 
*1-2 sentences clarifying the behavior with null values in the 
_WindowStateStore_ documentation could definitely help.* In addition, as I said 
if this is the desired behavior *you can easily skip calling RocksDB for null 
values (when using _retainDuplicates)_. This would make both the intention 
clearer and obviously avoid unnecessary calls.*

I do need exactly stream-stream join but without the repartition part. I want 
to get matches when there are new events in whichever stream and I also use 
_WindowStateStore_ only for the retention policy. In fact, due to the lack of 
many examples, I was looking at the stream-stream join implementation to find 
out how to correctly use the _WindowStateStores_. I'm building a library for 
some common yet not trivial at all operations on streams that you may need like 
topological sorting. Therefore I don't know if the user will provide null 
values or not. I was curious about the behavior with null values so I know what 
I'm providing to the user. I've tested it and that's how I found out what is 
the exact behavior.

*I'm not sure that an in-memory or any custom state store will make it.* Yes, 
in-memory will help with the efficient append because it avoids any expensive 
call and serializations/deserializations. Nevertheless, *you will always have 
the serializations/deserializations somewhere and this is the changelog topic 
and there you have also bandwidth* (not just precious processing time). Even if 
the list is fixed to let's say only 5 items you will still have 15 (1 + 2 + 3 + 
4 + 5) events recorded instead of 5. Obviously the size grows pretty fast - 
O(n^2). Combined with the fact that I want to provide a library to many 
different users (and duplicates count may vary a lot between usages) *to me 
it's best to implement just as in the stream-stream join - with duplicates*. 
Still, it was a great discussion and made me more confident in my decisions. 
Thank you for your assistance.

*Regarding the PR - it adds the same code to both _WindowStoreBuilder.java_ and 
_TimestampedWindowStoreBuilder.java_ but adds a test for only one of them.*

> Caching is not working properly with WindowStateStore when rataining 
> duplicates
> ---
>
> Key: KAFKA-9921
> URL: https://issues.apache.org/jira/browse/KAFKA-9921
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.5.0
>Reporter: Georgi Petkov
>Assignee: Sophie Blee-Goldman
>Priority: Major
> Fix For: 2.6.0, 2.5.1
>
>
> I'm using the current latest version 2.5.0 but this is not something new.
> I have _WindowStateStore_ configured as following (where _true_ stands for 
> the _retainDuplicates_ paramter):
>  _builder.addStateStore(windowStoreBuilder(persistentWindowStore(name, 
> retentionPeriod, windowSize, *true*), keySerde, 
> valueSerde)*.withCachingEnabled()*)_
> If I put 4 key-value pairs with the same key and values *1, 2, 3, 4* in that 
> order when reading them through the iterator I'll get the values *4, 2, 3, 4*.
>  I've done a bit of investigation myself and the problem is that *the whole 
> caching feature is written without consideration of the case where duplicates 
> are retained*.
> The observed behavior is due to having the last value in the cache (and it 
> can have only one since it's not aware of the retain duplicates option) and 
> it is read first (while skipping the first from the RocksDB iterator even 
> though the values are different). This can be observed (for version 2.5.0) in 
> _AbstractMergedSortedCacheStoreIterator#next()_ lines 95-97. Then the next 3 
> values are read from the RocksDB iterator so they are as expected.
> As I said, the whole feature is not considering the _retainDuplicates_ option 
> so there are other examples of incorrect behavior like in 
> _AbstractMergedSortedCacheStoreIterator__#peekNextKey()_ - for each call, you 
> would skip one duplicate entry in the RocksDB iterator for the given key.
> In my use case, I want to persist a list of values for a given key without 
> increasing the complexity to linear for a single event (which would be the 
> case if I was always reading the current list appending one value and writing 
> it back). So I go for 

[jira] [Comment Edited] (KAFKA-9921) Caching is not working properly with WindowStateStore when rataining duplicates

2020-04-27 Thread Georgi Petkov (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9921?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17094176#comment-17094176
 ] 

Georgi Petkov edited comment on KAFKA-9921 at 4/28/20, 5:58 AM:


[~ableegoldman] WindowStateStores don't really offer updates (or deletes in 
that matter) at least when using _retainDuplicates_ so `idempotent updates` 
sounds inappropriate to me. For 2 puts I would expect 2 entries regardless if 
they accidentally match.

*I was unable to determine the expected bahavior when putting _null_ values in 
_WindowStateStore_ (from documentation).* It turns out behaving like the 
_KeyValueStore_ - just delete the existing entry unless using 
_retainDuplicates_ - then *nothing happens, neither null is persisted nor any 
entries are deleted*. I've debugged the code and it reaches all the way to 
calling delete in RocksDB, so I'm not sure this is intended (or at least could 
be skipped in this case). What do you think? Should I create a separate bug for 
that? What should be the expected behavior?

Is there some other efficient approach for keeping a list by key? In my case, 
the store key is not the partition key but a relation between events and I 
would like to avoid repartitioning. To be honest I had a really hard time 
finding the appropriate tools for the job. The API is very limited in 
operations or at least no matter how I turn this around it feels that this is 
not the most efficient way to do things. *I have already partitioned data by a 
key that serves as a correlation ID (so data within a partition is 
self-contained).* The problem could be summarized to "*I need stream-stream 
join while avoiding repartitioning*".
 * If I go with PAPI then I need an efficient retention policy - go with 
_WindowStateStore_ (and its not that pleasant API when all you need is the 
retention policy). Then I need an efficient persisting of values by key - 
retain duplicates (so you only append new values),  but it turns out no 
optimizations in terms of caching are possible. So far this seems like the best 
approach and this is what I'm doing but it seems like I'm reimplementing 
stream-stream join without repartitioning.
 * If I go for stream-stream join this means to repartition both streams first 
since I have different keys to join by. This means 2 extra topics that won't be 
reused for the internally used _WindowStateStores_ (and I know that my data is 
partitioned well enough already). It would have been nice if I had the option 
to avoid repartitioning with a "my data is already properly partitioned / I 
know what I'm doing" option.
 * If I go with Clients API then I'm basically starting from scratch with API 
that is hard to use right and there are no state stores available.

*Any advice?* Am I missing something?


was (Author: georgi.petkov):
[~ableegoldman] WindowStateStores don't really offer updates (or deletes in 
that matter) at least when using _retainDuplicates_ so `idempotent updates` 
sounds inappropriate to me. For 2 puts I would expect 2 entries regardless if 
they accidentally match.

*I was unable to determine the expected bahavior when putting _null_ values in 
_WindowStateStore_ (from documentation).* It turns out behaving like the 
_KeyValueStore_ - just delete the existing entry unless using 
_retainDuplicates_ - then *nothing happens, neither null is persisted nor any 
entries are deleted*. I've debugged the code and it reaches all the way to 
calling delete in RocksDB, so I'm not sure this is intended (or at least could 
be skipped in this case). What do you think? Should I create a separate bug for 
that? What should be the expected behavior?

Is there some other efficient approach for keeping a list by key? In my case, 
the store key is not the partition key but a relation between events and I 
would like to avoid repartitioning. To be honest I had a really hard time 
finding the appropriate tools for the job. The API is very limited in 
operations or at least no matter how I turn this around it feels that this is 
not the most efficient way to do things. *I have already partitioned data by a 
key that serves as a correlation ID (so data within a partition is 
self-contained).* The problem could be summarized to "*I need stream-stream 
join while avoiding repartitioning*".
 * If I go with PAPI then I need an efficient retention policy - go with 
_WindowStateStore_ (and its not that pleasant API when all you need is the 
retention policy). Then I need an efficient persisting of values by key - 
retain duplicates (so you only append new values),  but no optimizations in 
terms of caching are possible. So far this seems like the best approach and 
this is what I'm doing.
 * If I go for stream-stream join this means to repartition both streams first 
since I have different keys to join by. This means 2 extra topics that won't be 
reused for the internally used 

[jira] [Comment Edited] (KAFKA-9921) Caching is not working properly with WindowStateStore when rataining duplicates

2020-04-27 Thread Georgi Petkov (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9921?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17094176#comment-17094176
 ] 

Georgi Petkov edited comment on KAFKA-9921 at 4/28/20, 5:52 AM:


[~ableegoldman] WindowStateStores don't really offer updates (or deletes in 
that matter) at least when using _retainDuplicates_ so `idempotent updates` 
sounds inappropriate to me. For 2 puts I would expect 2 entries regardless if 
they accidentally match.

*I was unable to determine the expected bahavior when putting _null_ values in 
_WindowStateStore_ (from documentation).* It turns out behaving like the 
_KeyValueStore_ - just delete the existing entry unless using 
_retainDuplicates_ - then *nothing happens, neither null is persisted nor any 
entries are deleted*. I've debugged the code and it reaches all the way to 
calling delete in RocksDB, so I'm not sure this is intended (or at least could 
be skipped in this case). What do you think? Should I create a separate bug for 
that? What should be the expected behavior?

Is there some other efficient approach for keeping a list by key? In my case, 
the store key is not the partition key but a relation between events and I 
would like to avoid repartitioning. To be honest I had a really hard time 
finding the appropriate tools for the job. The API is very limited in 
operations or at least no matter how I turn this around it feels that this is 
not the most efficient way to do things. *I have already partitioned data by a 
key that serves as a correlation ID (so data within a partition is 
self-contained).* The problem could be summarized to "*I need stream-stream 
join while avoiding repartitioning*".
 * If I go with PAPI then I need an efficient retention policy - go with 
_WindowStateStore_ (and its not that pleasant API when all you need is the 
retention policy). Then I need an efficient persisting of values by key - 
retain duplicates (so you only append new values),  but no optimizations in 
terms of caching are possible. So far this seems like the best approach and 
this is what I'm doing.
 * If I go for stream-stream join this means to repartition both streams first 
since I have different keys to join by. This means 2 extra topics that won't be 
reused for the internally used _WindowStateStores_ (and I know that my data is 
partitioned well enough already). It would have been nice if I had the option 
to avoid repartitioning with a "my data is already properly partitioned / I 
know what I'm doing" option.
 * If I go with Clients API then I'm basically starting from scratch with API 
that is hard to use right and there are no state stores available.

*Any advice?* Am I missing something?


was (Author: georgi.petkov):
[~ableegoldman] WindowStateStores don't really offer updates (or deletes in 
that matter) at least when using _retainDuplicates_ so `idempotent updates` 
sounds inappropriate to me. For 2 puts I would expect 2 entries regardless if 
they accidentally match.

*I was unable to determine the expected bahavior when putting _null_ values in 
_WindowStateStore_ (from documentation).* It turns out behaving like the 
_KeyValueStore_ - just delete the existing entry unless using 
_retainDuplicates_ - then *nothing happens, neither null is persisted nor any 
entries are deleted*. I've debugged the code and it reaches all the way to 
calling delete in RocksDB, so I'm not sure this is intended (or at least could 
be skipped in this case). What do you think? Should I create a separate bug for 
that? What should be the expected behavior?

Is there some other efficient approach for keeping a list by key? In my case, 
the store key is not the partition key but a relation between events and I 
would like to avoid repartitioning. To be honest I had a really hard time 
finding the appropriate tools for the job. The API is very limited in 
operations or at least no matter how I turn this around it feels that this is 
not the most efficient way to do things. I have already partitioned data by a 
key that serves as a correlation ID (so data within a partition is 
self-contained). The problem could be summarized to "I need stream-stream join 
while avoiding repartitioning".
 * If I go with PAPI then I need an efficient retention policy - go with 
_WindowStateStore_ (and its not that pleasant API when all you need is the 
retention policy). Then I need an efficient persisting of values by key - 
retain duplicates (so you only append new values),  but no optimizations in 
terms of caching are possible. So far this seems like the best approach and 
this is what I'm doing.
 * If I go for stream-stream join this means to repartition both streams first 
since I have different keys to join by. This means 2 extra topics that won't be 
reused for the internally used _WindowStateStores_ (and I know that my data is 
partitioned well enough already). It would have been nice if I 

[jira] [Commented] (KAFKA-9921) Caching is not working properly with WindowStateStore when rataining duplicates

2020-04-27 Thread Georgi Petkov (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9921?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17094176#comment-17094176
 ] 

Georgi Petkov commented on KAFKA-9921:
--

[~ableegoldman] WindowStateStores don't really offer updates (or deletes in 
that matter) at least when using _retainDuplicates_ so `idempotent updates` 
sounds inappropriate to me. For 2 puts I would expect 2 entries regardless if 
they accidentally match.

*I was unable to determine the expected bahavior when putting _null_ values in 
_WindowStateStore_ (from documentation).* It turns out behaving like the 
_KeyValueStore_ - just delete the existing entry unless using 
_retainDuplicates_ - then *nothing happens, neither null is persisted nor any 
entries are deleted*. I've debugged the code and it reaches all the way to 
calling delete in RocksDB, so I'm not sure this is intended (or at least could 
be skipped in this case). What do you think? Should I create a separate bug for 
that? What should be the expected behavior?

Is there some other efficient approach for keeping a list by key? In my case, 
the store key is not the partition key but a relation between events and I 
would like to avoid repartitioning. To be honest I had a really hard time 
finding the appropriate tools for the job. The API is very limited in 
operations or at least no matter how I turn this around it feels that this is 
not the most efficient way to do things. I have already partitioned data by a 
key that serves as a correlation ID (so data within a partition is 
self-contained). The problem could be summarized to "I need stream-stream join 
while avoiding repartitioning".
 * If I go with PAPI then I need an efficient retention policy - go with 
_WindowStateStore_ (and its not that pleasant API when all you need is the 
retention policy). Then I need an efficient persisting of values by key - 
retain duplicates (so you only append new values),  but no optimizations in 
terms of caching are possible. So far this seems like the best approach and 
this is what I'm doing.
 * If I go for stream-stream join this means to repartition both streams first 
since I have different keys to join by. This means 2 extra topics that won't be 
reused for the internally used _WindowStateStores_ (and I know that my data is 
partitioned well enough already). It would have been nice if I had the option 
to avoid repartitioning with a "my data is already properly partitioned / I 
know what I'm doing" option.
 * If I go with Clients API then I'm basically starting from scratch with API 
that is hard to use right and there are no state stores available.

Any advice? Am I missing something?

> Caching is not working properly with WindowStateStore when rataining 
> duplicates
> ---
>
> Key: KAFKA-9921
> URL: https://issues.apache.org/jira/browse/KAFKA-9921
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.5.0
>Reporter: Georgi Petkov
>Assignee: Sophie Blee-Goldman
>Priority: Major
> Fix For: 2.6.0
>
>
> I'm using the current latest version 2.5.0 but this is not something new.
> I have _WindowStateStore_ configured as following (where _true_ stands for 
> the _retainDuplicates_ paramter):
>  _builder.addStateStore(windowStoreBuilder(persistentWindowStore(name, 
> retentionPeriod, windowSize, *true*), keySerde, 
> valueSerde)*.withCachingEnabled()*)_
> If I put 4 key-value pairs with the same key and values *1, 2, 3, 4* in that 
> order when reading them through the iterator I'll get the values *4, 2, 3, 4*.
>  I've done a bit of investigation myself and the problem is that *the whole 
> caching feature is written without consideration of the case where duplicates 
> are retained*.
> The observed behavior is due to having the last value in the cache (and it 
> can have only one since it's not aware of the retain duplicates option) and 
> it is read first (while skipping the first from the RocksDB iterator even 
> though the values are different). This can be observed (for version 2.5.0) in 
> _AbstractMergedSortedCacheStoreIterator#next()_ lines 95-97. Then the next 3 
> values are read from the RocksDB iterator so they are as expected.
> As I said, the whole feature is not considering the _retainDuplicates_ option 
> so there are other examples of incorrect behavior like in 
> _AbstractMergedSortedCacheStoreIterator__#peekNextKey()_ - for each call, you 
> would skip one duplicate entry in the RocksDB iterator for the given key.
> In my use case, I want to persist a list of values for a given key without 
> increasing the complexity to linear for a single event (which would be the 
> case if I was always reading the current list appending one value and writing 
> it back). So I go for _List>_ instead of 

[jira] [Commented] (KAFKA-9921) Caching is not working properly with WindowStateStore when rataining duplicates

2020-04-27 Thread Georgi Petkov (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9921?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17093855#comment-17093855
 ] 

Georgi Petkov commented on KAFKA-9921:
--

The same reason you would ever want caching - for the expected performance 
improvement. I already explained why I need to retain the duplicates in the 
pre-last paragraph.

Now that I think about it, maybe none of the operations can be optimized as 
much unless I've already performed read operation on a key and I repeat that 
later on before flush (with or without changes in between). That is due to the 
fact that you never know if there are other values for the key you operate on 
unless you call the underlying store first.

But all that is because I've familiarized myself with the actual 
implementation. Nobody would ever consider any of this when reading the 
documentation or looking at the API. Even if you decide that no significant 
performance gain can be obtained at least you should disable this combination 
of caching and retaining duplicates since the results are incorrect even in a 
straightforward scenario like the one provided in the description.

> Caching is not working properly with WindowStateStore when rataining 
> duplicates
> ---
>
> Key: KAFKA-9921
> URL: https://issues.apache.org/jira/browse/KAFKA-9921
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.5.0
>Reporter: Georgi Petkov
>Priority: Major
>
> I'm using the current latest version 2.5.0 but this is not something new.
> I have _WindowStateStore_ configured as following (where _true_ stands for 
> the _retainDuplicates_ paramter):
>  _builder.addStateStore(windowStoreBuilder(persistentWindowStore(name, 
> retentionPeriod, windowSize, *true*), keySerde, 
> valueSerde)*.withCachingEnabled()*)_
> If I put 4 key-value pairs with the same key and values *1, 2, 3, 4* in that 
> order when reading them through the iterator I'll get the values *4, 2, 3, 4*.
>  I've done a bit of investigation myself and the problem is that *the whole 
> caching feature is written without consideration of the case where duplicates 
> are retained*.
> The observed behavior is due to having the last value in the cache (and it 
> can have only one since it's not aware of the retain duplicates option) and 
> it is read first (while skipping the first from the RocksDB iterator even 
> though the values are different). This can be observed (for version 2.5.0) in 
> _AbstractMergedSortedCacheStoreIterator#next()_ lines 95-97. Then the next 3 
> values are read from the RocksDB iterator so they are as expected.
> As I said, the whole feature is not considering the _retainDuplicates_ option 
> so there are other examples of incorrect behavior like in 
> _AbstractMergedSortedCacheStoreIterator__#peekNextKey()_ - for each call, you 
> would skip one duplicate entry in the RocksDB iterator for the given key.
> In my use case, I want to persist a list of values for a given key without 
> increasing the complexity to linear for a single event (which would be the 
> case if I was always reading the current list appending one value and writing 
> it back). So I go for _List>_ instead of _KeyValuePair List>_. The whole use case is more complex than that so I use 
> _#transformValues_ and state stores.
> So as an impact I can't use caching on my state stores. For others - they'll 
> have incorrect behavior that may take a lot of time to be discovered and even 
> more time to fix the results.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-9921) Caching is not working properly with WindowStateStore when rataining duplicates

2020-04-26 Thread Georgi Petkov (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9921?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Georgi Petkov updated KAFKA-9921:
-
Description: 
I'm using the current latest version 2.5.0 but this is not something new.

I have _WindowStateStore_ configured as following (where _true_ stands for the 
_retainDuplicates_ paramter):
 _builder.addStateStore(windowStoreBuilder(persistentWindowStore(name, 
retentionPeriod, windowSize, *true*), keySerde, 
valueSerde)*.withCachingEnabled()*)_

If I put 4 key-value pairs with the same key and values *1, 2, 3, 4* in that 
order when reading them through the iterator I'll get the values *4, 2, 3, 4*.
 I've done a bit of investigation myself and the problem is that *the whole 
caching feature is written without consideration of the case where duplicates 
are retained*.

The observed behavior is due to having the last value in the cache (and it can 
have only one since it's not aware of the retain duplicates option) and it is 
read first (while skipping the first from the RocksDB iterator even though the 
values are different). This can be observed (for version 2.5.0) in 
_AbstractMergedSortedCacheStoreIterator#next()_ lines 95-97. Then the next 3 
values are read from the RocksDB iterator so they are as expected.

As I said, the whole feature is not considering the _retainDuplicates_ option 
so there are other examples of incorrect behavior like in 
_AbstractMergedSortedCacheStoreIterator__#peekNextKey()_ - for each call, you 
would skip one duplicate entry in the RocksDB iterator for the given key.

In my use case, I want to persist a list of values for a given key without 
increasing the complexity to linear for a single event (which would be the case 
if I was always reading the current list appending one value and writing it 
back). So I go for _List>_ instead of _KeyValuePair>_. The whole use case is more complex than that so I use 
_#transformValues_ and state stores.

So as an impact I can't use caching on my state stores. For others - they'll 
have incorrect behavior that may take a lot of time to be discovered and even 
more time to fix the results.

  was:
I'm using the current latest version 2.5.0 but this is not something new.

I have _WindowStateStore_ configured as following (where _true_ stands for the 
_retainDuplicates_ paramter):
 _builder.addStateStore(windowStoreBuilder(persistentWindowStore(name, 
retentionPeriod, windowSize, *true*), keySerde, 
valueSerde)*.withCachingEnabled()*)_

If I put 4 key-value pairs with the same key and values *1, 2, 3, 4* in that 
order when reading them through the iterator I'll get the values *4, 2, 3, 4*.
 I've done a bit of investigation myself and the problem is that *the whole 
caching feature is written without consideration of the case where duplicates 
are retained*.

The observed behavior is due to having the last value in the cache (and it can 
have only one since it's not aware of the retain duplicates option) and it is 
read first (while skipping the first from the RocksDB iterator even though the 
values are different). This can be observed (for version 2.5.0) in 
_AbstractMergedSortedCacheStoreIterator#next()_ lines 95-97. Then the next 3 
values are read from the RocksDB iterator so they are as expected.

As I said, the whole feature is not considering the _retainDuplicates_ option 
so there are other examples of incorrect behavior like in 
_AbstractMergedSortedCacheStoreIterator__#peekNextKey()_ - for each call, you 
would skip one duplicate entry in the RocksDB iterator for the given key.

In my use case, I want to persist a list of values for a given key without 
increasing the complexity to linear for a single event (which would be the case 
if I was always reading the current list appending one value and writing it 
back). So I go for _List>_ instead of _KeyValuePair>_. The whole use case is more complex than that so I use 
_#transformValues_ and state stores.

So as an impact I'm currently not able to use caching on my state stores.


> Caching is not working properly with WindowStateStore when rataining 
> duplicates
> ---
>
> Key: KAFKA-9921
> URL: https://issues.apache.org/jira/browse/KAFKA-9921
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.5.0
>Reporter: Georgi Petkov
>Priority: Major
>
> I'm using the current latest version 2.5.0 but this is not something new.
> I have _WindowStateStore_ configured as following (where _true_ stands for 
> the _retainDuplicates_ paramter):
>  _builder.addStateStore(windowStoreBuilder(persistentWindowStore(name, 
> retentionPeriod, windowSize, *true*), keySerde, 
> valueSerde)*.withCachingEnabled()*)_
> If I put 4 key-value pairs with the same key and values *1, 2, 3, 4* in that 
> order when 

[jira] [Updated] (KAFKA-9921) Caching is not working properly with WindowStateStore when rataining duplicates

2020-04-26 Thread Georgi Petkov (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9921?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Georgi Petkov updated KAFKA-9921:
-
Description: 
I'm using the current latest version 2.5.0 but this is not something new.

I have _WindowStateStore_ configured as following (where _true_ stands for the 
_retainDuplicates_ paramter):
 _builder.addStateStore(windowStoreBuilder(persistentWindowStore(name, 
retentionPeriod, windowSize, *true*), keySerde, 
valueSerde)*.withCachingEnabled()*)_

If I put 4 key-value pairs with the same key and values *1, 2, 3, 4* in that 
order when reading them through the iterator I'll get the values *4, 2, 3, 4*.
 I've done a bit of investigation myself and the problem is that *the whole 
caching feature is written without consideration of the case where duplicates 
are retained*.

The observed behavior is due to having the last value in the cache (and it can 
have only one since it's not aware of the retain duplicates option) and it is 
read first (while skipping the first from the RocksDB iterator even though the 
values are different). This can be observed (for version 2.5.0) in 
_AbstractMergedSortedCacheStoreIterator#next()_ lines 95-97. Then the next 3 
values are read from the RocksDB iterator so they are as expected.

As I said, the whole feature is not considering the _retainDuplicates_ option 
so there are other examples of incorrect behavior like in 
_AbstractMergedSortedCacheStoreIterator__#peekNextKey()_ - for each call, you 
would skip one duplicate entry in the RocksDB iterator for the given key.

In my use case, I want to persist a list of values for a given key without 
increasing the complexity to linear for a single event (which would be the case 
if I was always reading the current list appending one value and writing it 
back). So I go for _List>_ instead of _KeyValuePair>_. The whole use case is more complex than that so I use 
_#transformValues_ and state stores.

So as an impact I'm currently not able to use caching on my state stores.

  was:
I'm using the current latest version 2.5.0 but this is not something new.

I have _WindowStateStore_ configured as following (where _true_ stands for the 
_retainDuplicates_ paramter):
 _builder.addStateStore(windowStoreBuilder(persistentWindowStore(name, 
retentionPeriod, windowSize, *true*), keySerde, 
valueSerde)*.withCachingEnabled()*)_

If I put 4 key-value pairs with the same key and values *1, 2, 3, 4* in that 
order when reading them through the iterator I'll get the values *4, 2, 3, 4*.
 I've done a bit of investigation myself and the problem is that *the whole 
caching feature is written without consideration of the case where duplicates 
are retained*.

The observed behavior is due to having the last value in the cache (and it can 
have only one since it's not aware of the retain duplicates option) and it is 
read first (while skipping the first from the RocksDB iterator even though the 
values are different). This can be observed (for version 2.5.0) in 
_AbstractMergedSortedCacheStoreIterator#next()_ lines 95-97. Then we read the 
next 3 values from the RocksDB iterator and they are as expected.

As I said, the whole feature is not considering the _retainDuplicates_ option 
so there are other examples of incorrect behavior like in 
_AbstractMergedSortedCacheStoreIterator__#peekNextKey()_ - for each call, you 
would skip one duplicate entry in the RocksDB iterator for the given key.

In my use case, I want to persist a list of values for a given key without 
increasing the complexity to linear for a single event (which would be the case 
if I was always reading the current list appending one value and writing it 
back). So I go for _List>_ instead of _KeyValuePair>_. The whole use case is more complex than that so I use 
_#transformValues_ and state stores.

So as an impact I'm currently not able to use caching on my state stores.


> Caching is not working properly with WindowStateStore when rataining 
> duplicates
> ---
>
> Key: KAFKA-9921
> URL: https://issues.apache.org/jira/browse/KAFKA-9921
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.5.0
>Reporter: Georgi Petkov
>Priority: Major
>
> I'm using the current latest version 2.5.0 but this is not something new.
> I have _WindowStateStore_ configured as following (where _true_ stands for 
> the _retainDuplicates_ paramter):
>  _builder.addStateStore(windowStoreBuilder(persistentWindowStore(name, 
> retentionPeriod, windowSize, *true*), keySerde, 
> valueSerde)*.withCachingEnabled()*)_
> If I put 4 key-value pairs with the same key and values *1, 2, 3, 4* in that 
> order when reading them through the iterator I'll get the values *4, 2, 3, 4*.
>  I've done a bit of investigation myself and the 

[jira] [Updated] (KAFKA-9921) Caching is not working properly with WindowStateStore when rataining duplicates

2020-04-26 Thread Georgi Petkov (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9921?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Georgi Petkov updated KAFKA-9921:
-
Description: 
I'm using the current latest version 2.5.0 but this is not something new.

I have _WindowStateStore_ configured as following (where _true_ stands for the 
_retainDuplicates_ paramter):
 _builder.addStateStore(windowStoreBuilder(persistentWindowStore(name, 
retentionPeriod, windowSize, *true*), keySerde, 
valueSerde)*.withCachingEnabled()*)_

If I put 4 key-value pairs with the same key and values *1, 2, 3, 4* in that 
order when reading them through the iterator I'll get the values *4, 2, 3, 4*.
 I've done a bit of investigation myself and the problem is that *the whole 
caching feature is written without consideration of the case where duplicates 
are retained*.

The observed behavior is due to having the last value in the cache (and it can 
have only one since it's not aware of the retain duplicates option) and it is 
read first (while skipping the first from the RocksDB iterator even though the 
values are different). This can be observed (for version 2.5.0) in 
_AbstractMergedSortedCacheStoreIterator#next()_ lines 95-97. Then we read the 
next 3 values from the RocksDB iterator and they are as expected.

As I said, the whole feature is not considering the _retainDuplicates_ option 
so there are other examples of incorrect behavior like in 
_AbstractMergedSortedCacheStoreIterator__#peekNextKey()_ - for each call, you 
would skip one duplicate entry in the RocksDB iterator for the given key.

In my use case, I want to persist a list of values for a given key without 
increasing the complexity to linear for a single event (which would be the case 
if I was always reading the current list appending one value and writing it 
back). So I go for _List>_ instead of _KeyValuePair>_. The whole use case is more complex than that so I use 
_#transformValues_ and state stores.

So as an impact I'm currently not able to use caching on my state stores.

  was:
I'm using the current latest version 2.5.0 but this is not something new.

I have _WindowStateStore_ configured as following (where _true_ stands for the 
_retainDuplicates_ paramter):
 _builder.addStateStore(windowStoreBuilder(persistentWindowStore(name, 
retentionPeriod, windowSize, *true*), keySerde, 
valueSerde)*.withCachingEnabled()*)_

If I put 4 key-value pairs with the same key and values *1, 2, 3, 4* in that 
order when reading them through the iterator I'll get the values *4, 2, 3, 4*.
I've done a bit of investigation myself and the problem is that *the whole 
caching feature is written without consideration of the case where duplicates 
are retained*.

The observed behavior is due to having the last value in the cache (and it can 
have only one since it's not aware of the retain duplicates option) and it is 
read first (while skipping the first from the RocksDB iterator even though the 
values are different). This can be observed (for version 2.5.0) in 
_AbstractMergedSortedCacheStoreIterator#next()_ lines 95-97. Then we read the 
next 3 values from the RocksDB iterator and they are as expected.

As I said, the whole feature is not considering the _retainDuplicates_ option 
so there are other examples of incorrect behavior like in 
_AbstractMergedSortedCacheStoreIterator__#peekNextKey()_ - for each call, you 
would skip one duplicate entry **in the RocksDB iterator for the given key.

In my use case, I want to persist a list of values for a given key without 
increasing the complexity to linear for a single event (which would be the case 
if I was always reading the current list appending one value and writing it 
back). So I go for _List>_ instead of _KeyValuePair>_. The whole use case is more complex than that so I use 
_#transformValues_ and state stores.

So as an impact I'm currently not able to use caching on my state stores.


> Caching is not working properly with WindowStateStore when rataining 
> duplicates
> ---
>
> Key: KAFKA-9921
> URL: https://issues.apache.org/jira/browse/KAFKA-9921
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.5.0
>Reporter: Georgi Petkov
>Priority: Major
>
> I'm using the current latest version 2.5.0 but this is not something new.
> I have _WindowStateStore_ configured as following (where _true_ stands for 
> the _retainDuplicates_ paramter):
>  _builder.addStateStore(windowStoreBuilder(persistentWindowStore(name, 
> retentionPeriod, windowSize, *true*), keySerde, 
> valueSerde)*.withCachingEnabled()*)_
> If I put 4 key-value pairs with the same key and values *1, 2, 3, 4* in that 
> order when reading them through the iterator I'll get the values *4, 2, 3, 4*.
>  I've done a bit of investigation myself and the 

[jira] [Created] (KAFKA-9921) Caching is not working properly with WindowStateStore when rataining duplicates

2020-04-26 Thread Georgi Petkov (Jira)
Georgi Petkov created KAFKA-9921:


 Summary: Caching is not working properly with WindowStateStore 
when rataining duplicates
 Key: KAFKA-9921
 URL: https://issues.apache.org/jira/browse/KAFKA-9921
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 2.5.0
Reporter: Georgi Petkov


I'm using the current latest version 2.5.0 but this is not something new.

I have _WindowStateStore_ configured as following (where _true_ stands for the 
_retainDuplicates_ paramter):
 _builder.addStateStore(windowStoreBuilder(persistentWindowStore(name, 
retentionPeriod, windowSize, *true*), keySerde, 
valueSerde)*.withCachingEnabled()*)_

If I put 4 key-value pairs with the same key and values *1, 2, 3, 4* in that 
order when reading them through the iterator I'll get the values *4, 2, 3, 4*.
I've done a bit of investigation myself and the problem is that *the whole 
caching feature is written without consideration of the case where duplicates 
are retained*.

The observed behavior is due to having the last value in the cache (and it can 
have only one since it's not aware of the retain duplicates option) and it is 
read first (while skipping the first from the RocksDB iterator even though the 
values are different). This can be observed (for version 2.5.0) in 
_AbstractMergedSortedCacheStoreIterator#next()_ lines 95-97. Then we read the 
next 3 values from the RocksDB iterator and they are as expected.

As I said, the whole feature is not considering the _retainDuplicates_ option 
so there are other examples of incorrect behavior like in 
_AbstractMergedSortedCacheStoreIterator__#peekNextKey()_ - for each call, you 
would skip one duplicate entry **in the RocksDB iterator for the given key.

In my use case, I want to persist a list of values for a given key without 
increasing the complexity to linear for a single event (which would be the case 
if I was always reading the current list appending one value and writing it 
back). So I go for _List>_ instead of _KeyValuePair>_. The whole use case is more complex than that so I use 
_#transformValues_ and state stores.

So as an impact I'm currently not able to use caching on my state stores.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)