[ 
https://issues.apache.org/jira/browse/KAFKA-4212?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16878346#comment-16878346
 ] 

Stanislav Savulchik commented on KAFKA-4212:
--------------------------------------------

Hi, I'd like to describe an implementation that allows me to remove expired 
keys from KeyValueStore[K, V] emulating the missing TTL feature in case it 
could be useful to someone.

The approach requires using:
 # Transformer/Processor API,
 # stream punctuation
 # an additional in-memory KeyValueStore as time-based index of key updates.

Write path: when I create/update a key along with its timestamp in the primary 
KeyValueStore[K, (Timestamp, V)] I also update the secondary 
KeyValueStore[TimeSegment, Set[K]] removing the key from an old time segment 
and adding it to a new time segment.

Expiration path: upon stream punctuation I iterate over expired time segments 
that I fetch using a range query [-Inf; LastExpiredTimeSegment] and then 1) 
remove expired keys in the segments from the primary store and 2) remove the 
expired segments from the secondary store.

Adjusting stream punctuation interval and time segment granularity I can spread 
pauses caused by expiration procedure over time.

I wish I could just use the missing TTL feature for that!

> Add a key-value store that is a TTL persistent cache
> ----------------------------------------------------
>
>                 Key: KAFKA-4212
>                 URL: https://issues.apache.org/jira/browse/KAFKA-4212
>             Project: Kafka
>          Issue Type: Improvement
>          Components: streams
>    Affects Versions: 0.10.0.1
>            Reporter: Elias Levy
>            Priority: Major
>              Labels: api
>
> Some jobs needs to maintain as state a large set of key-values for some 
> period of time.  I.e. they need to maintain a TTL cache of values potentially 
> larger than memory. 
> Currently Kafka Streams provides non-windowed and windowed key-value stores.  
> Neither is an exact fit to this use case.  
> The {{RocksDBStore}}, a {{KeyValueStore}}, stores one value per key as 
> required, but does not support expiration.  The TTL option of RocksDB is 
> explicitly not used.
> The {{RocksDBWindowsStore}}, a {{WindowsStore}}, can expire items via segment 
> dropping, but it stores multiple items per key, based on their timestamp.  
> But this store can be repurposed as a cache by fetching the items in reverse 
> chronological order and returning the first item found.
> KAFKA-2594 introduced a fixed-capacity in-memory LRU caching store, but here 
> we desire a variable-capacity memory-overflowing TTL caching store.
> Although {{RocksDBWindowsStore}} can be repurposed as a cache, it would be 
> useful to have an official and proper TTL cache API and implementation.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to