[jira] [Comment Edited] (KAFKA-9921) Caching is not working properly with WindowStateStore when rataining duplicates
[ https://issues.apache.org/jira/browse/KAFKA-9921?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17094900#comment-17094900 ] Georgi Petkov edited comment on KAFKA-9921 at 4/28/20, 10:23 PM: - [~ableegoldman] Yeah, I agree that probably not much can be done in terms of caching (compared to the options without _retainDuplicates_). I totally agree that many of the features like the null value behavior are correct and make perfect sense from point of view of the features implemented with it. Still, it's strange from the perspective where you use it standalone. *1-2 sentences clarifying the behavior with null values in the _WindowStateStore_ documentation could definitely help.* In addition, as I said if this is the desired behavior *you can easily skip calling RocksDB for null values (when using _retainDuplicates)_. This would both make the intention clearer and obviously avoid unnecessary calls.* I do need exactly stream-stream join but without the repartition part. I want to get matches when there are new events in whichever stream, support duplicate keys in the stream and I also use _WindowStateStore_ only for the retention policy. In fact, due to the lack of many examples, I was looking at the stream-stream join implementation to find out how to correctly use the _WindowStateStores_. I'm building a library for some common yet not trivial at all operations on streams that you may need like topological sorting. Therefore I don't know if the user will provide null values or not. I was curious about the behavior with null values so I know what I'm providing to the user. I've tested it and that's how I found out what is the exact behavior. *I'm not sure that an in-memory or any custom state store will make it.* Yes, in-memory will help with the efficient append because it avoids any expensive call and serializations/deserializations. Nevertheless, *you will always have the serializations/deserializations somewhere and this is the changelog topic and there you have also bandwidth* (not just precious processing time). Even if the list is fixed to let's say only 5 items you will still have 15 (1 + 2 + 3 + 4 + 5) events recorded instead of 5. Obviously the size grows pretty fast - O(n^2). Combined with the fact that I want to provide a library to many different users (and duplicates count may vary a lot between usages) *to me it's best to implement just as in the stream-stream join - with duplicates*. Still, it was a great discussion and made me more confident in my decisions. Thank you for your assistance. *Regarding the PR - it adds the same code to both _WindowStoreBuilder.java_ and _TimestampedWindowStoreBuilder.java_ but adds a test for only one of them.* was (Author: georgi.petkov): [~ableegoldman] Yeah, I agree that probably not much can be done in terms of caching (compared to the options without _retainDuplicates_). I totally agree that many of the features like the null value behavior are correct and make perfect sense from point of view of the features implemented with it. Still, it's strange from the perspective where you use it standalone. *1-2 sentences clarifying the behavior with null values in the _WindowStateStore_ documentation could definitely help.* In addition, as I said if this is the desired behavior *you can easily skip calling RocksDB for null values (when using _retainDuplicates)_. This would make both the intention clearer and obviously avoid unnecessary calls.* I do need exactly stream-stream join but without the repartition part. I want to get matches when there are new events in whichever stream and I also use _WindowStateStore_ only for the retention policy. In fact, due to the lack of many examples, I was looking at the stream-stream join implementation to find out how to correctly use the _WindowStateStores_. I'm building a library for some common yet not trivial at all operations on streams that you may need like topological sorting. Therefore I don't know if the user will provide null values or not. I was curious about the behavior with null values so I know what I'm providing to the user. I've tested it and that's how I found out what is the exact behavior. *I'm not sure that an in-memory or any custom state store will make it.* Yes, in-memory will help with the efficient append because it avoids any expensive call and serializations/deserializations. Nevertheless, *you will always have the serializations/deserializations somewhere and this is the changelog topic and there you have also bandwidth* (not just precious processing time). Even if the list is fixed to let's say only 5 items you will still have 15 (1 + 2 + 3 + 4 + 5) events recorded instead of 5. Obviously the size grows pretty fast - O(n^2). Combined with the fact that I want to provide a library to many different users (and duplicates count may vary a lot
[jira] [Comment Edited] (KAFKA-9921) Caching is not working properly with WindowStateStore when rataining duplicates
[ https://issues.apache.org/jira/browse/KAFKA-9921?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17094176#comment-17094176 ] Georgi Petkov edited comment on KAFKA-9921 at 4/28/20, 5:58 AM: [~ableegoldman] WindowStateStores don't really offer updates (or deletes in that matter) at least when using _retainDuplicates_ so `idempotent updates` sounds inappropriate to me. For 2 puts I would expect 2 entries regardless if they accidentally match. *I was unable to determine the expected bahavior when putting _null_ values in _WindowStateStore_ (from documentation).* It turns out behaving like the _KeyValueStore_ - just delete the existing entry unless using _retainDuplicates_ - then *nothing happens, neither null is persisted nor any entries are deleted*. I've debugged the code and it reaches all the way to calling delete in RocksDB, so I'm not sure this is intended (or at least could be skipped in this case). What do you think? Should I create a separate bug for that? What should be the expected behavior? Is there some other efficient approach for keeping a list by key? In my case, the store key is not the partition key but a relation between events and I would like to avoid repartitioning. To be honest I had a really hard time finding the appropriate tools for the job. The API is very limited in operations or at least no matter how I turn this around it feels that this is not the most efficient way to do things. *I have already partitioned data by a key that serves as a correlation ID (so data within a partition is self-contained).* The problem could be summarized to "*I need stream-stream join while avoiding repartitioning*". * If I go with PAPI then I need an efficient retention policy - go with _WindowStateStore_ (and its not that pleasant API when all you need is the retention policy). Then I need an efficient persisting of values by key - retain duplicates (so you only append new values), but it turns out no optimizations in terms of caching are possible. So far this seems like the best approach and this is what I'm doing but it seems like I'm reimplementing stream-stream join without repartitioning. * If I go for stream-stream join this means to repartition both streams first since I have different keys to join by. This means 2 extra topics that won't be reused for the internally used _WindowStateStores_ (and I know that my data is partitioned well enough already). It would have been nice if I had the option to avoid repartitioning with a "my data is already properly partitioned / I know what I'm doing" option. * If I go with Clients API then I'm basically starting from scratch with API that is hard to use right and there are no state stores available. *Any advice?* Am I missing something? was (Author: georgi.petkov): [~ableegoldman] WindowStateStores don't really offer updates (or deletes in that matter) at least when using _retainDuplicates_ so `idempotent updates` sounds inappropriate to me. For 2 puts I would expect 2 entries regardless if they accidentally match. *I was unable to determine the expected bahavior when putting _null_ values in _WindowStateStore_ (from documentation).* It turns out behaving like the _KeyValueStore_ - just delete the existing entry unless using _retainDuplicates_ - then *nothing happens, neither null is persisted nor any entries are deleted*. I've debugged the code and it reaches all the way to calling delete in RocksDB, so I'm not sure this is intended (or at least could be skipped in this case). What do you think? Should I create a separate bug for that? What should be the expected behavior? Is there some other efficient approach for keeping a list by key? In my case, the store key is not the partition key but a relation between events and I would like to avoid repartitioning. To be honest I had a really hard time finding the appropriate tools for the job. The API is very limited in operations or at least no matter how I turn this around it feels that this is not the most efficient way to do things. *I have already partitioned data by a key that serves as a correlation ID (so data within a partition is self-contained).* The problem could be summarized to "*I need stream-stream join while avoiding repartitioning*". * If I go with PAPI then I need an efficient retention policy - go with _WindowStateStore_ (and its not that pleasant API when all you need is the retention policy). Then I need an efficient persisting of values by key - retain duplicates (so you only append new values), but no optimizations in terms of caching are possible. So far this seems like the best approach and this is what I'm doing. * If I go for stream-stream join this means to repartition both streams first since I have different keys to join by. This means 2 extra topics that won't be reused for the internally used
[jira] [Comment Edited] (KAFKA-9921) Caching is not working properly with WindowStateStore when rataining duplicates
[ https://issues.apache.org/jira/browse/KAFKA-9921?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17094176#comment-17094176 ] Georgi Petkov edited comment on KAFKA-9921 at 4/28/20, 5:52 AM: [~ableegoldman] WindowStateStores don't really offer updates (or deletes in that matter) at least when using _retainDuplicates_ so `idempotent updates` sounds inappropriate to me. For 2 puts I would expect 2 entries regardless if they accidentally match. *I was unable to determine the expected bahavior when putting _null_ values in _WindowStateStore_ (from documentation).* It turns out behaving like the _KeyValueStore_ - just delete the existing entry unless using _retainDuplicates_ - then *nothing happens, neither null is persisted nor any entries are deleted*. I've debugged the code and it reaches all the way to calling delete in RocksDB, so I'm not sure this is intended (or at least could be skipped in this case). What do you think? Should I create a separate bug for that? What should be the expected behavior? Is there some other efficient approach for keeping a list by key? In my case, the store key is not the partition key but a relation between events and I would like to avoid repartitioning. To be honest I had a really hard time finding the appropriate tools for the job. The API is very limited in operations or at least no matter how I turn this around it feels that this is not the most efficient way to do things. *I have already partitioned data by a key that serves as a correlation ID (so data within a partition is self-contained).* The problem could be summarized to "*I need stream-stream join while avoiding repartitioning*". * If I go with PAPI then I need an efficient retention policy - go with _WindowStateStore_ (and its not that pleasant API when all you need is the retention policy). Then I need an efficient persisting of values by key - retain duplicates (so you only append new values), but no optimizations in terms of caching are possible. So far this seems like the best approach and this is what I'm doing. * If I go for stream-stream join this means to repartition both streams first since I have different keys to join by. This means 2 extra topics that won't be reused for the internally used _WindowStateStores_ (and I know that my data is partitioned well enough already). It would have been nice if I had the option to avoid repartitioning with a "my data is already properly partitioned / I know what I'm doing" option. * If I go with Clients API then I'm basically starting from scratch with API that is hard to use right and there are no state stores available. *Any advice?* Am I missing something? was (Author: georgi.petkov): [~ableegoldman] WindowStateStores don't really offer updates (or deletes in that matter) at least when using _retainDuplicates_ so `idempotent updates` sounds inappropriate to me. For 2 puts I would expect 2 entries regardless if they accidentally match. *I was unable to determine the expected bahavior when putting _null_ values in _WindowStateStore_ (from documentation).* It turns out behaving like the _KeyValueStore_ - just delete the existing entry unless using _retainDuplicates_ - then *nothing happens, neither null is persisted nor any entries are deleted*. I've debugged the code and it reaches all the way to calling delete in RocksDB, so I'm not sure this is intended (or at least could be skipped in this case). What do you think? Should I create a separate bug for that? What should be the expected behavior? Is there some other efficient approach for keeping a list by key? In my case, the store key is not the partition key but a relation between events and I would like to avoid repartitioning. To be honest I had a really hard time finding the appropriate tools for the job. The API is very limited in operations or at least no matter how I turn this around it feels that this is not the most efficient way to do things. I have already partitioned data by a key that serves as a correlation ID (so data within a partition is self-contained). The problem could be summarized to "I need stream-stream join while avoiding repartitioning". * If I go with PAPI then I need an efficient retention policy - go with _WindowStateStore_ (and its not that pleasant API when all you need is the retention policy). Then I need an efficient persisting of values by key - retain duplicates (so you only append new values), but no optimizations in terms of caching are possible. So far this seems like the best approach and this is what I'm doing. * If I go for stream-stream join this means to repartition both streams first since I have different keys to join by. This means 2 extra topics that won't be reused for the internally used _WindowStateStores_ (and I know that my data is partitioned well enough already). It would have been nice if I