[ 
https://issues.apache.org/jira/browse/KAFKA-4212?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15524402#comment-15524402
 ] 

Elias Levy commented on KAFKA-4212:
-----------------------------------

The general use case is the joining of updates to two tables over a limited 
period of time.

Consider a hypothetical monitoring service that that allows clients to query 
the status of nodes.  The application may wish to inform the clients whenever 
the status of a node that they have queried changes, but only if the client has 
queried the status during the past 24 hours and if the last status for a node 
is different from the last status the client received.

To do so the service can consume a stream of client node status queries with 
their responses and node status updates.  From the stream of client node status 
queries the service would maintain a cache of the last status for a node sent 
to to a client such that entries expire after 24 hours.  From the node status 
updates the service would maintain a mapping of node to latest status.

When a client query is received, the service can check on the node status 
mapping to see if there is a newer status, and if there is, generate a 
notification.  When a node status update is received, the service can check the 
last status sent to clients in the cache and generate a notification with the 
new status to all clients that previously queried for a node's status.

As an optimization the mapping of nodes to latest status can also be a cache 
with a TTL, since you don't need to keep the statuses of a nodes that haven't 
changed in more than 24 hours, as you'll never receive a delayed node status 
query to match it against.

Abstractly this is equivalent to a {{KTable}}-{{KTable}} inner join where 
entries in each {{KTable}} expire after some TTL, and where one table has a 
composite primary key (node id and client id on one {{KTable}} vs just node it 
on the other).

It could also be though as a windowed {{KTable}} - {{KTable}} join (although in 
such case records that fall outside the window would never be used and are just 
wasting space), or a windowed {{KStream}}-{{KStream}} join of table updates 
where only the latest updated values are used (i.e. discard updates in the 
window if there is a newer update).  Although, again, these would be joins 
where the primary keys are not identical as one is a composite.

Alas, Kafka Streams does not support windowed {{KTable}}-{{KTable}} joins, 
TTL'ed {{KeyValueStore}} s, or joins across {{KTable}} s and/or {{KStream}} s 
with different keys.

That said, the above service can be implemented by joining the client status 
query and client status updates streams using custom processors and by abusing 
{{WindowStore}}.  {{WindowStore}} can be used as a form of TTL'ed 
{{KeyValueStore}}, as it will drop old values that fall out of its window, and 
by iterating in reverse order and only using the latest value. And since it 
allows you to store multiple values for the same key (node id), you can record 
the node status you handed out to clients (node id key; client id, status, and 
timestamp as value) and then iterate over all of them for a given node id 
keeping only the latest one for each client id when a node status update comes 
in an you perform the join.



> Add a key-value store that is a TTL persistent cache
> ----------------------------------------------------
>
>                 Key: KAFKA-4212
>                 URL: https://issues.apache.org/jira/browse/KAFKA-4212
>             Project: Kafka
>          Issue Type: Improvement
>          Components: streams
>    Affects Versions: 0.10.0.1
>            Reporter: Elias Levy
>            Assignee: Guozhang Wang
>
> Some jobs needs to maintain as state a large set of key-values for some 
> period of time.  I.e. they need to maintain a TTL cache of values potentially 
> larger than memory. 
> Currently Kafka Streams provides non-windowed and windowed key-value stores.  
> Neither is an exact fit to this use case.  
> The {{RocksDBStore}}, a {{KeyValueStore}}, stores one value per key as 
> required, but does not support expiration.  The TTL option of RocksDB is 
> explicitly not used.
> The {{RocksDBWindowsStore}}, a {{WindowsStore}}, can expire items via segment 
> dropping, but it stores multiple items per key, based on their timestamp.  
> But this store can be repurposed as a cache by fetching the items in reverse 
> chronological order and returning the first item found.
> KAFKA-2594 introduced a fixed-capacity in-memory LRU caching store, but here 
> we desire a variable-capacity memory-overflowing TTL caching store.
> Although {{RocksDBWindowsStore}} can be repurposed as a cache, it would be 
> useful to have an official and proper TTL cache API and implementation.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to