[jira] [Commented] (KAFKA-4212) Add a key-value store that is a TTL persistent cache

2019-08-19 Thread James Ritt (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-4212?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16910650#comment-16910650
 ] 

James Ritt commented on KAFKA-4212:
---

Hi [~guozhang] thanks for input! In the comments above, most of us clamoring 
for this feature seem to have usages where there is a need to enable the TTL 
just for RocksDB in order to prevent unbounded growth, and that strict 
coherency with the topic isn't required. And given the feedback from Sophie & 
Matthias, it definitely seemed like we were heading the KIP direction as I 
couldn't come up with a good way to signal usage of the TTL RocksDB in the APIs 
available now: `RocksDBConfigSetter` can't be repurposed as it currently 
exists, we don't want to modify the `Stores` API, and `StreamsConfig`seemed too 
high-level. So if anyone has an idea as to how to properly surface this as an 
option it would help inform a KPI.

> Add a key-value store that is a TTL persistent cache
> 
>
> Key: KAFKA-4212
> URL: https://issues.apache.org/jira/browse/KAFKA-4212
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.0.1
>Reporter: Elias Levy
>Priority: Major
>  Labels: api
>
> Some jobs needs to maintain as state a large set of key-values for some 
> period of time.  I.e. they need to maintain a TTL cache of values potentially 
> larger than memory. 
> Currently Kafka Streams provides non-windowed and windowed key-value stores.  
> Neither is an exact fit to this use case.  
> The {{RocksDBStore}}, a {{KeyValueStore}}, stores one value per key as 
> required, but does not support expiration.  The TTL option of RocksDB is 
> explicitly not used.
> The {{RocksDBWindowsStore}}, a {{WindowsStore}}, can expire items via segment 
> dropping, but it stores multiple items per key, based on their timestamp.  
> But this store can be repurposed as a cache by fetching the items in reverse 
> chronological order and returning the first item found.
> KAFKA-2594 introduced a fixed-capacity in-memory LRU caching store, but here 
> we desire a variable-capacity memory-overflowing TTL caching store.
> Although {{RocksDBWindowsStore}} can be repurposed as a cache, it would be 
> useful to have an official and proper TTL cache API and implementation.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Commented] (KAFKA-4212) Add a key-value store that is a TTL persistent cache

2019-08-06 Thread James Ritt (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-4212?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16901545#comment-16901545
 ] 

James Ritt commented on KAFKA-4212:
---

Thanks [~ableegoldman] for spending your time with us on this!

Unfortunately, the `RocksDBStore` class relies on a handful of classes private 
to the `org.apache.kafka.streams.state.internals` package, so a quick copy & 
paste job didn't appear to be an option; as such, we're maintaining our own 
fork with the changes proposed above patched in.

> Add a key-value store that is a TTL persistent cache
> 
>
> Key: KAFKA-4212
> URL: https://issues.apache.org/jira/browse/KAFKA-4212
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.0.1
>Reporter: Elias Levy
>Priority: Major
>  Labels: api
>
> Some jobs needs to maintain as state a large set of key-values for some 
> period of time.  I.e. they need to maintain a TTL cache of values potentially 
> larger than memory. 
> Currently Kafka Streams provides non-windowed and windowed key-value stores.  
> Neither is an exact fit to this use case.  
> The {{RocksDBStore}}, a {{KeyValueStore}}, stores one value per key as 
> required, but does not support expiration.  The TTL option of RocksDB is 
> explicitly not used.
> The {{RocksDBWindowsStore}}, a {{WindowsStore}}, can expire items via segment 
> dropping, but it stores multiple items per key, based on their timestamp.  
> But this store can be repurposed as a cache by fetching the items in reverse 
> chronological order and returning the first item found.
> KAFKA-2594 introduced a fixed-capacity in-memory LRU caching store, but here 
> we desire a variable-capacity memory-overflowing TTL caching store.
> Although {{RocksDBWindowsStore}} can be repurposed as a cache, it would be 
> useful to have an official and proper TTL cache API and implementation.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Commented] (KAFKA-4212) Add a key-value store that is a TTL persistent cache

2019-07-22 Thread James Ritt (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-4212?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16890580#comment-16890580
 ] 

James Ritt commented on KAFKA-4212:
---

Hi [~ableegoldman] [~mjsax] [~guozhang] [~bbejeck] any further thoughts on 
this? If possible I'd like to help get this supported inside Streams, but if 
that's not a likely outcome we will probably just end up doing something like 
patch above locally (we'll have to do something in not too long in light of our 
RocksDB growing unbounded).

> Add a key-value store that is a TTL persistent cache
> 
>
> Key: KAFKA-4212
> URL: https://issues.apache.org/jira/browse/KAFKA-4212
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.0.1
>Reporter: Elias Levy
>Priority: Major
>  Labels: api
>
> Some jobs needs to maintain as state a large set of key-values for some 
> period of time.  I.e. they need to maintain a TTL cache of values potentially 
> larger than memory. 
> Currently Kafka Streams provides non-windowed and windowed key-value stores.  
> Neither is an exact fit to this use case.  
> The {{RocksDBStore}}, a {{KeyValueStore}}, stores one value per key as 
> required, but does not support expiration.  The TTL option of RocksDB is 
> explicitly not used.
> The {{RocksDBWindowsStore}}, a {{WindowsStore}}, can expire items via segment 
> dropping, but it stores multiple items per key, based on their timestamp.  
> But this store can be repurposed as a cache by fetching the items in reverse 
> chronological order and returning the first item found.
> KAFKA-2594 introduced a fixed-capacity in-memory LRU caching store, but here 
> we desire a variable-capacity memory-overflowing TTL caching store.
> Although {{RocksDBWindowsStore}} can be repurposed as a cache, it would be 
> useful to have an official and proper TTL cache API and implementation.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Commented] (KAFKA-4212) Add a key-value store that is a TTL persistent cache

2019-07-15 Thread James Ritt (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-4212?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16885498#comment-16885498
 ] 

James Ritt commented on KAFKA-4212:
---

Thanks [~ableegoldman] & [~mjsax], you both make good points. Unfortunately the 
`Map` argument to `setConfig()` appears to be populated from 
`org.apache.kafka.streams.processor.ProcessorContext#appConfigs`, meaning it's 
effectively just pulling in the current `StreamsConfig`.

In order to keep the configs consolidated as per Sophie's request, what do we 
think about defining a new method on the `RocksDBConfigSetter` interface?

We could possibly create it with a default (for backwards compatibility): 
something like `default Integer getTtl(final String storeName, final Options 
options) \{return null;}`, with the idea that we'd check this method's result 
to determine which RocksDB constructor to call.

> Add a key-value store that is a TTL persistent cache
> 
>
> Key: KAFKA-4212
> URL: https://issues.apache.org/jira/browse/KAFKA-4212
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.0.1
>Reporter: Elias Levy
>Priority: Major
>  Labels: api
>
> Some jobs needs to maintain as state a large set of key-values for some 
> period of time.  I.e. they need to maintain a TTL cache of values potentially 
> larger than memory. 
> Currently Kafka Streams provides non-windowed and windowed key-value stores.  
> Neither is an exact fit to this use case.  
> The {{RocksDBStore}}, a {{KeyValueStore}}, stores one value per key as 
> required, but does not support expiration.  The TTL option of RocksDB is 
> explicitly not used.
> The {{RocksDBWindowsStore}}, a {{WindowsStore}}, can expire items via segment 
> dropping, but it stores multiple items per key, based on their timestamp.  
> But this store can be repurposed as a cache by fetching the items in reverse 
> chronological order and returning the first item found.
> KAFKA-2594 introduced a fixed-capacity in-memory LRU caching store, but here 
> we desire a variable-capacity memory-overflowing TTL caching store.
> Although {{RocksDBWindowsStore}} can be repurposed as a cache, it would be 
> useful to have an official and proper TTL cache API and implementation.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Commented] (KAFKA-4212) Add a key-value store that is a TTL persistent cache

2019-07-10 Thread James Ritt (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-4212?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16882449#comment-16882449
 ] 

James Ritt commented on KAFKA-4212:
---

Thanks [~ableegoldman] & [~mjsax]! So I took a look at 
[`RocksDBConfigSetter`|https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/state/RocksDBConfigSetter.java#L45]
 and my current understanding is that its `setConfig` method is used to mutate 
the provided `org.rocksdb.Options` before it's subsequently passed into the 
rocks DB constructor. Unfortunately, ttl doesn't seem to be configurable within 
`org.rocksdb.Options`, and instead, as far as I can tell, the usage of the 
TtlDB would instead need to be enacted by using the appropriate constructor 
[here|https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java#L191].
 Please let me know if I'm missing something, but given the above, the next 
approach I was considering was adding a StreamsConfig rocksDbTtl setting 
similar to [here;|https://github.com/apache/kafka/pull/2159/files] your 
thoughts?

> Add a key-value store that is a TTL persistent cache
> 
>
> Key: KAFKA-4212
> URL: https://issues.apache.org/jira/browse/KAFKA-4212
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.0.1
>Reporter: Elias Levy
>Priority: Major
>  Labels: api
>
> Some jobs needs to maintain as state a large set of key-values for some 
> period of time.  I.e. they need to maintain a TTL cache of values potentially 
> larger than memory. 
> Currently Kafka Streams provides non-windowed and windowed key-value stores.  
> Neither is an exact fit to this use case.  
> The {{RocksDBStore}}, a {{KeyValueStore}}, stores one value per key as 
> required, but does not support expiration.  The TTL option of RocksDB is 
> explicitly not used.
> The {{RocksDBWindowsStore}}, a {{WindowsStore}}, can expire items via segment 
> dropping, but it stores multiple items per key, based on their timestamp.  
> But this store can be repurposed as a cache by fetching the items in reverse 
> chronological order and returning the first item found.
> KAFKA-2594 introduced a fixed-capacity in-memory LRU caching store, but here 
> we desire a variable-capacity memory-overflowing TTL caching store.
> Although {{RocksDBWindowsStore}} can be repurposed as a cache, it would be 
> useful to have an official and proper TTL cache API and implementation.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-4212) Add a key-value store that is a TTL persistent cache

2019-07-03 Thread James Ritt (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-4212?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16878024#comment-16878024
 ] 

James Ritt commented on KAFKA-4212:
---

Hi [~mjsax], thanks for your input! I'm not the original requestor, but my 
understanding of this JIRA is that it would be useful to have a TTL KV cache 
within streams: something I'd agree with, and it appears a commenter or two 
above also agree. The main draw for me is that the TTL cache, instead of 
growing unbounded, would then essentially mimic the underlying topic which we 
created with `cleanup.policy=delete` & `delete.retention.ms` set.

I very much could be wrong, but I think rocksDB and a topic setup with 
`cleanup.policy=delete` & `delete.retention.ms` both work off wall-clock time, 
so they would be congruent in that respect? And it's true that not being 
event-based could introduce discrepancies between the two (in particular, 
imagine the cache is configured with the same TTL as the topic but that the 
cache is offline for a couple hours, when the cache comes back online it will 
hold onto the values for an extra couple hours), but that could be fine as long 
as application semantics don't depend upon cache eviction.

An example might help: our current use case is to store a cache of revoked auth 
tokens. These tokens contain an expiration and are relatively short-lived, so 
we setup the containing topic with `delete.retention.ms` equal to their 
lifetime. We were then hoping to use Stream's `GlobalKTable` cache on this 
topic. With my PR, we could use the newly-added TTL KV cache with the same TTL 
as the underlying topic. And in this situation, wall-clock skew is fine, as 
there is no harm in them persisting in the cache for extra time. Without this 
change, I believe our underlying rocksDB cache would grow unbounded.

> Add a key-value store that is a TTL persistent cache
> 
>
> Key: KAFKA-4212
> URL: https://issues.apache.org/jira/browse/KAFKA-4212
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.0.1
>Reporter: Elias Levy
>Priority: Major
>  Labels: api
>
> Some jobs needs to maintain as state a large set of key-values for some 
> period of time.  I.e. they need to maintain a TTL cache of values potentially 
> larger than memory. 
> Currently Kafka Streams provides non-windowed and windowed key-value stores.  
> Neither is an exact fit to this use case.  
> The {{RocksDBStore}}, a {{KeyValueStore}}, stores one value per key as 
> required, but does not support expiration.  The TTL option of RocksDB is 
> explicitly not used.
> The {{RocksDBWindowsStore}}, a {{WindowsStore}}, can expire items via segment 
> dropping, but it stores multiple items per key, based on their timestamp.  
> But this store can be repurposed as a cache by fetching the items in reverse 
> chronological order and returning the first item found.
> KAFKA-2594 introduced a fixed-capacity in-memory LRU caching store, but here 
> we desire a variable-capacity memory-overflowing TTL caching store.
> Although {{RocksDBWindowsStore}} can be repurposed as a cache, it would be 
> useful to have an official and proper TTL cache API and implementation.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-4212) Add a key-value store that is a TTL persistent cache

2019-07-01 Thread James Ritt (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-4212?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16876553#comment-16876553
 ] 

James Ritt commented on KAFKA-4212:
---

Thanks [~ableegoldman]!

I definitely should have included more context in my comment above: the 
situation we're looking at is we have an underlying topic with a 
cleanup.policy=delete & delete.retention.ms set. We then use Streams API create 
a GlobalKTable over that topic. So in my understanding, the topic will get 
cleaned out automatically, but without setting a TTL for the persistent KV 
topic cache, the underlying rocksdb will grow unbounded, thus this PR (see also 
https://stackoverflow.com/questions/48080721/kafka-streams-ktable-from-topic-with-retention-policy?rq=1).
 But please LMK if I'm wrong!

W.r.t. strictness, thanks for the heads up: in our particular case we're fine 
with the lower bound behavior as it's fine semantically if the values stay in 
our cache longer than in the topic.

> Add a key-value store that is a TTL persistent cache
> 
>
> Key: KAFKA-4212
> URL: https://issues.apache.org/jira/browse/KAFKA-4212
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.0.1
>Reporter: Elias Levy
>Priority: Major
>  Labels: api
>
> Some jobs needs to maintain as state a large set of key-values for some 
> period of time.  I.e. they need to maintain a TTL cache of values potentially 
> larger than memory. 
> Currently Kafka Streams provides non-windowed and windowed key-value stores.  
> Neither is an exact fit to this use case.  
> The {{RocksDBStore}}, a {{KeyValueStore}}, stores one value per key as 
> required, but does not support expiration.  The TTL option of RocksDB is 
> explicitly not used.
> The {{RocksDBWindowsStore}}, a {{WindowsStore}}, can expire items via segment 
> dropping, but it stores multiple items per key, based on their timestamp.  
> But this store can be repurposed as a cache by fetching the items in reverse 
> chronological order and returning the first item found.
> KAFKA-2594 introduced a fixed-capacity in-memory LRU caching store, but here 
> we desire a variable-capacity memory-overflowing TTL caching store.
> Although {{RocksDBWindowsStore}} can be repurposed as a cache, it would be 
> useful to have an official and proper TTL cache API and implementation.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-4212) Add a key-value store that is a TTL persistent cache

2019-07-01 Thread James Ritt (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-4212?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16876490#comment-16876490
 ] 

James Ritt commented on KAFKA-4212:
---

So here is another attempt at this: [https://github.com/apache/kafka/pull/7020] 
. We exposed a new constructor in org.apache.kafka.streams.state.Stores which 
takes in the TTL.

This PR includes a passing unit test: any feedback would be appreciated!

> Add a key-value store that is a TTL persistent cache
> 
>
> Key: KAFKA-4212
> URL: https://issues.apache.org/jira/browse/KAFKA-4212
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.0.1
>Reporter: Elias Levy
>Priority: Major
>  Labels: api
>
> Some jobs needs to maintain as state a large set of key-values for some 
> period of time.  I.e. they need to maintain a TTL cache of values potentially 
> larger than memory. 
> Currently Kafka Streams provides non-windowed and windowed key-value stores.  
> Neither is an exact fit to this use case.  
> The {{RocksDBStore}}, a {{KeyValueStore}}, stores one value per key as 
> required, but does not support expiration.  The TTL option of RocksDB is 
> explicitly not used.
> The {{RocksDBWindowsStore}}, a {{WindowsStore}}, can expire items via segment 
> dropping, but it stores multiple items per key, based on their timestamp.  
> But this store can be repurposed as a cache by fetching the items in reverse 
> chronological order and returning the first item found.
> KAFKA-2594 introduced a fixed-capacity in-memory LRU caching store, but here 
> we desire a variable-capacity memory-overflowing TTL caching store.
> Although {{RocksDBWindowsStore}} can be repurposed as a cache, it would be 
> useful to have an official and proper TTL cache API and implementation.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)