[jira] [Commented] (KAFKA-4212) Add a key-value store that is a TTL persistent cache
[ https://issues.apache.org/jira/browse/KAFKA-4212?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16910650#comment-16910650 ] James Ritt commented on KAFKA-4212: --- Hi [~guozhang] thanks for input! In the comments above, most of us clamoring for this feature seem to have usages where there is a need to enable the TTL just for RocksDB in order to prevent unbounded growth, and that strict coherency with the topic isn't required. And given the feedback from Sophie & Matthias, it definitely seemed like we were heading the KIP direction as I couldn't come up with a good way to signal usage of the TTL RocksDB in the APIs available now: `RocksDBConfigSetter` can't be repurposed as it currently exists, we don't want to modify the `Stores` API, and `StreamsConfig`seemed too high-level. So if anyone has an idea as to how to properly surface this as an option it would help inform a KPI. > Add a key-value store that is a TTL persistent cache > > > Key: KAFKA-4212 > URL: https://issues.apache.org/jira/browse/KAFKA-4212 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 0.10.0.1 >Reporter: Elias Levy >Priority: Major > Labels: api > > Some jobs needs to maintain as state a large set of key-values for some > period of time. I.e. they need to maintain a TTL cache of values potentially > larger than memory. > Currently Kafka Streams provides non-windowed and windowed key-value stores. > Neither is an exact fit to this use case. > The {{RocksDBStore}}, a {{KeyValueStore}}, stores one value per key as > required, but does not support expiration. The TTL option of RocksDB is > explicitly not used. > The {{RocksDBWindowsStore}}, a {{WindowsStore}}, can expire items via segment > dropping, but it stores multiple items per key, based on their timestamp. > But this store can be repurposed as a cache by fetching the items in reverse > chronological order and returning the first item found. > KAFKA-2594 introduced a fixed-capacity in-memory LRU caching store, but here > we desire a variable-capacity memory-overflowing TTL caching store. > Although {{RocksDBWindowsStore}} can be repurposed as a cache, it would be > useful to have an official and proper TTL cache API and implementation. -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Commented] (KAFKA-4212) Add a key-value store that is a TTL persistent cache
[ https://issues.apache.org/jira/browse/KAFKA-4212?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16901545#comment-16901545 ] James Ritt commented on KAFKA-4212: --- Thanks [~ableegoldman] for spending your time with us on this! Unfortunately, the `RocksDBStore` class relies on a handful of classes private to the `org.apache.kafka.streams.state.internals` package, so a quick copy & paste job didn't appear to be an option; as such, we're maintaining our own fork with the changes proposed above patched in. > Add a key-value store that is a TTL persistent cache > > > Key: KAFKA-4212 > URL: https://issues.apache.org/jira/browse/KAFKA-4212 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 0.10.0.1 >Reporter: Elias Levy >Priority: Major > Labels: api > > Some jobs needs to maintain as state a large set of key-values for some > period of time. I.e. they need to maintain a TTL cache of values potentially > larger than memory. > Currently Kafka Streams provides non-windowed and windowed key-value stores. > Neither is an exact fit to this use case. > The {{RocksDBStore}}, a {{KeyValueStore}}, stores one value per key as > required, but does not support expiration. The TTL option of RocksDB is > explicitly not used. > The {{RocksDBWindowsStore}}, a {{WindowsStore}}, can expire items via segment > dropping, but it stores multiple items per key, based on their timestamp. > But this store can be repurposed as a cache by fetching the items in reverse > chronological order and returning the first item found. > KAFKA-2594 introduced a fixed-capacity in-memory LRU caching store, but here > we desire a variable-capacity memory-overflowing TTL caching store. > Although {{RocksDBWindowsStore}} can be repurposed as a cache, it would be > useful to have an official and proper TTL cache API and implementation. -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Commented] (KAFKA-4212) Add a key-value store that is a TTL persistent cache
[ https://issues.apache.org/jira/browse/KAFKA-4212?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16890580#comment-16890580 ] James Ritt commented on KAFKA-4212: --- Hi [~ableegoldman] [~mjsax] [~guozhang] [~bbejeck] any further thoughts on this? If possible I'd like to help get this supported inside Streams, but if that's not a likely outcome we will probably just end up doing something like patch above locally (we'll have to do something in not too long in light of our RocksDB growing unbounded). > Add a key-value store that is a TTL persistent cache > > > Key: KAFKA-4212 > URL: https://issues.apache.org/jira/browse/KAFKA-4212 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 0.10.0.1 >Reporter: Elias Levy >Priority: Major > Labels: api > > Some jobs needs to maintain as state a large set of key-values for some > period of time. I.e. they need to maintain a TTL cache of values potentially > larger than memory. > Currently Kafka Streams provides non-windowed and windowed key-value stores. > Neither is an exact fit to this use case. > The {{RocksDBStore}}, a {{KeyValueStore}}, stores one value per key as > required, but does not support expiration. The TTL option of RocksDB is > explicitly not used. > The {{RocksDBWindowsStore}}, a {{WindowsStore}}, can expire items via segment > dropping, but it stores multiple items per key, based on their timestamp. > But this store can be repurposed as a cache by fetching the items in reverse > chronological order and returning the first item found. > KAFKA-2594 introduced a fixed-capacity in-memory LRU caching store, but here > we desire a variable-capacity memory-overflowing TTL caching store. > Although {{RocksDBWindowsStore}} can be repurposed as a cache, it would be > useful to have an official and proper TTL cache API and implementation. -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Commented] (KAFKA-4212) Add a key-value store that is a TTL persistent cache
[ https://issues.apache.org/jira/browse/KAFKA-4212?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16885498#comment-16885498 ] James Ritt commented on KAFKA-4212: --- Thanks [~ableegoldman] & [~mjsax], you both make good points. Unfortunately the `Map` argument to `setConfig()` appears to be populated from `org.apache.kafka.streams.processor.ProcessorContext#appConfigs`, meaning it's effectively just pulling in the current `StreamsConfig`. In order to keep the configs consolidated as per Sophie's request, what do we think about defining a new method on the `RocksDBConfigSetter` interface? We could possibly create it with a default (for backwards compatibility): something like `default Integer getTtl(final String storeName, final Options options) \{return null;}`, with the idea that we'd check this method's result to determine which RocksDB constructor to call. > Add a key-value store that is a TTL persistent cache > > > Key: KAFKA-4212 > URL: https://issues.apache.org/jira/browse/KAFKA-4212 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 0.10.0.1 >Reporter: Elias Levy >Priority: Major > Labels: api > > Some jobs needs to maintain as state a large set of key-values for some > period of time. I.e. they need to maintain a TTL cache of values potentially > larger than memory. > Currently Kafka Streams provides non-windowed and windowed key-value stores. > Neither is an exact fit to this use case. > The {{RocksDBStore}}, a {{KeyValueStore}}, stores one value per key as > required, but does not support expiration. The TTL option of RocksDB is > explicitly not used. > The {{RocksDBWindowsStore}}, a {{WindowsStore}}, can expire items via segment > dropping, but it stores multiple items per key, based on their timestamp. > But this store can be repurposed as a cache by fetching the items in reverse > chronological order and returning the first item found. > KAFKA-2594 introduced a fixed-capacity in-memory LRU caching store, but here > we desire a variable-capacity memory-overflowing TTL caching store. > Although {{RocksDBWindowsStore}} can be repurposed as a cache, it would be > useful to have an official and proper TTL cache API and implementation. -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Commented] (KAFKA-4212) Add a key-value store that is a TTL persistent cache
[ https://issues.apache.org/jira/browse/KAFKA-4212?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16882449#comment-16882449 ] James Ritt commented on KAFKA-4212: --- Thanks [~ableegoldman] & [~mjsax]! So I took a look at [`RocksDBConfigSetter`|https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/state/RocksDBConfigSetter.java#L45] and my current understanding is that its `setConfig` method is used to mutate the provided `org.rocksdb.Options` before it's subsequently passed into the rocks DB constructor. Unfortunately, ttl doesn't seem to be configurable within `org.rocksdb.Options`, and instead, as far as I can tell, the usage of the TtlDB would instead need to be enacted by using the appropriate constructor [here|https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java#L191]. Please let me know if I'm missing something, but given the above, the next approach I was considering was adding a StreamsConfig rocksDbTtl setting similar to [here;|https://github.com/apache/kafka/pull/2159/files] your thoughts? > Add a key-value store that is a TTL persistent cache > > > Key: KAFKA-4212 > URL: https://issues.apache.org/jira/browse/KAFKA-4212 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 0.10.0.1 >Reporter: Elias Levy >Priority: Major > Labels: api > > Some jobs needs to maintain as state a large set of key-values for some > period of time. I.e. they need to maintain a TTL cache of values potentially > larger than memory. > Currently Kafka Streams provides non-windowed and windowed key-value stores. > Neither is an exact fit to this use case. > The {{RocksDBStore}}, a {{KeyValueStore}}, stores one value per key as > required, but does not support expiration. The TTL option of RocksDB is > explicitly not used. > The {{RocksDBWindowsStore}}, a {{WindowsStore}}, can expire items via segment > dropping, but it stores multiple items per key, based on their timestamp. > But this store can be repurposed as a cache by fetching the items in reverse > chronological order and returning the first item found. > KAFKA-2594 introduced a fixed-capacity in-memory LRU caching store, but here > we desire a variable-capacity memory-overflowing TTL caching store. > Although {{RocksDBWindowsStore}} can be repurposed as a cache, it would be > useful to have an official and proper TTL cache API and implementation. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-4212) Add a key-value store that is a TTL persistent cache
[ https://issues.apache.org/jira/browse/KAFKA-4212?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16878024#comment-16878024 ] James Ritt commented on KAFKA-4212: --- Hi [~mjsax], thanks for your input! I'm not the original requestor, but my understanding of this JIRA is that it would be useful to have a TTL KV cache within streams: something I'd agree with, and it appears a commenter or two above also agree. The main draw for me is that the TTL cache, instead of growing unbounded, would then essentially mimic the underlying topic which we created with `cleanup.policy=delete` & `delete.retention.ms` set. I very much could be wrong, but I think rocksDB and a topic setup with `cleanup.policy=delete` & `delete.retention.ms` both work off wall-clock time, so they would be congruent in that respect? And it's true that not being event-based could introduce discrepancies between the two (in particular, imagine the cache is configured with the same TTL as the topic but that the cache is offline for a couple hours, when the cache comes back online it will hold onto the values for an extra couple hours), but that could be fine as long as application semantics don't depend upon cache eviction. An example might help: our current use case is to store a cache of revoked auth tokens. These tokens contain an expiration and are relatively short-lived, so we setup the containing topic with `delete.retention.ms` equal to their lifetime. We were then hoping to use Stream's `GlobalKTable` cache on this topic. With my PR, we could use the newly-added TTL KV cache with the same TTL as the underlying topic. And in this situation, wall-clock skew is fine, as there is no harm in them persisting in the cache for extra time. Without this change, I believe our underlying rocksDB cache would grow unbounded. > Add a key-value store that is a TTL persistent cache > > > Key: KAFKA-4212 > URL: https://issues.apache.org/jira/browse/KAFKA-4212 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 0.10.0.1 >Reporter: Elias Levy >Priority: Major > Labels: api > > Some jobs needs to maintain as state a large set of key-values for some > period of time. I.e. they need to maintain a TTL cache of values potentially > larger than memory. > Currently Kafka Streams provides non-windowed and windowed key-value stores. > Neither is an exact fit to this use case. > The {{RocksDBStore}}, a {{KeyValueStore}}, stores one value per key as > required, but does not support expiration. The TTL option of RocksDB is > explicitly not used. > The {{RocksDBWindowsStore}}, a {{WindowsStore}}, can expire items via segment > dropping, but it stores multiple items per key, based on their timestamp. > But this store can be repurposed as a cache by fetching the items in reverse > chronological order and returning the first item found. > KAFKA-2594 introduced a fixed-capacity in-memory LRU caching store, but here > we desire a variable-capacity memory-overflowing TTL caching store. > Although {{RocksDBWindowsStore}} can be repurposed as a cache, it would be > useful to have an official and proper TTL cache API and implementation. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-4212) Add a key-value store that is a TTL persistent cache
[ https://issues.apache.org/jira/browse/KAFKA-4212?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16876553#comment-16876553 ] James Ritt commented on KAFKA-4212: --- Thanks [~ableegoldman]! I definitely should have included more context in my comment above: the situation we're looking at is we have an underlying topic with a cleanup.policy=delete & delete.retention.ms set. We then use Streams API create a GlobalKTable over that topic. So in my understanding, the topic will get cleaned out automatically, but without setting a TTL for the persistent KV topic cache, the underlying rocksdb will grow unbounded, thus this PR (see also https://stackoverflow.com/questions/48080721/kafka-streams-ktable-from-topic-with-retention-policy?rq=1). But please LMK if I'm wrong! W.r.t. strictness, thanks for the heads up: in our particular case we're fine with the lower bound behavior as it's fine semantically if the values stay in our cache longer than in the topic. > Add a key-value store that is a TTL persistent cache > > > Key: KAFKA-4212 > URL: https://issues.apache.org/jira/browse/KAFKA-4212 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 0.10.0.1 >Reporter: Elias Levy >Priority: Major > Labels: api > > Some jobs needs to maintain as state a large set of key-values for some > period of time. I.e. they need to maintain a TTL cache of values potentially > larger than memory. > Currently Kafka Streams provides non-windowed and windowed key-value stores. > Neither is an exact fit to this use case. > The {{RocksDBStore}}, a {{KeyValueStore}}, stores one value per key as > required, but does not support expiration. The TTL option of RocksDB is > explicitly not used. > The {{RocksDBWindowsStore}}, a {{WindowsStore}}, can expire items via segment > dropping, but it stores multiple items per key, based on their timestamp. > But this store can be repurposed as a cache by fetching the items in reverse > chronological order and returning the first item found. > KAFKA-2594 introduced a fixed-capacity in-memory LRU caching store, but here > we desire a variable-capacity memory-overflowing TTL caching store. > Although {{RocksDBWindowsStore}} can be repurposed as a cache, it would be > useful to have an official and proper TTL cache API and implementation. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-4212) Add a key-value store that is a TTL persistent cache
[ https://issues.apache.org/jira/browse/KAFKA-4212?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16876490#comment-16876490 ] James Ritt commented on KAFKA-4212: --- So here is another attempt at this: [https://github.com/apache/kafka/pull/7020] . We exposed a new constructor in org.apache.kafka.streams.state.Stores which takes in the TTL. This PR includes a passing unit test: any feedback would be appreciated! > Add a key-value store that is a TTL persistent cache > > > Key: KAFKA-4212 > URL: https://issues.apache.org/jira/browse/KAFKA-4212 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 0.10.0.1 >Reporter: Elias Levy >Priority: Major > Labels: api > > Some jobs needs to maintain as state a large set of key-values for some > period of time. I.e. they need to maintain a TTL cache of values potentially > larger than memory. > Currently Kafka Streams provides non-windowed and windowed key-value stores. > Neither is an exact fit to this use case. > The {{RocksDBStore}}, a {{KeyValueStore}}, stores one value per key as > required, but does not support expiration. The TTL option of RocksDB is > explicitly not used. > The {{RocksDBWindowsStore}}, a {{WindowsStore}}, can expire items via segment > dropping, but it stores multiple items per key, based on their timestamp. > But this store can be repurposed as a cache by fetching the items in reverse > chronological order and returning the first item found. > KAFKA-2594 introduced a fixed-capacity in-memory LRU caching store, but here > we desire a variable-capacity memory-overflowing TTL caching store. > Although {{RocksDBWindowsStore}} can be repurposed as a cache, it would be > useful to have an official and proper TTL cache API and implementation. -- This message was sent by Atlassian JIRA (v7.6.3#76005)