[ 
https://issues.apache.org/jira/browse/FLINK-3089?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16341298#comment-16341298
 ] 

Stefan Richter edited comment on FLINK-3089 at 1/26/18 5:12 PM:
----------------------------------------------------------------

[~phoenixjiangnan] I agree that we could start with a relaxed TTL approach. I 
would not limit the feature to RocksDB, in fact I am also considering to 
implement incremental snapshot for the heap backend and have some approach how 
to this could be done.

For TTL on the heap backend, I also have some ideas how this could work for the 
async variant (see `CopyOnWriteStateTable` which is the default since 1.4 and 
might become the only implementation eventually). For example, one idea is that 
we might go for an approach that works similar to the incremental rehash: doing 
a linear scan over the directory that removes outdated entries over time. This 
scan is performed in very small steps and driven by other operations,  e.g. a 
small fraction of the buckets (maybe just one) is cleaned up as side activity 
for every operation on the map to amortize the cleanup costs. With the linear 
nature,  at least those accesses to the bucket array are also cache conscious. 
Besides, of course we can also drop all outdated entries that we encounter 
during the operations. In general, outdated entries cound be detected by an 
attached timestamp (introducing more memory overhead per entry), or we could 
try to correlate timeout with the state version that already exists on every 
entry in this map.


was (Author: srichter):
[~phoenixjiangnan] I agree that we could start with a relaxed TTL approach. I 
would not limit the feature to RocksDB, in fact I am also considering to 
implement incremental snapshot for the heap backend and have some approach how 
to this could be done.

For TTL on the heap backend, I also have some ideas how this could work for the 
async variant (see `CopyOnWriteStateTable` which is the default since 1.4 and 
might become the only implementation eventually). For example, one idea is that 
we might go for an approach that works similar to the incremental rehash: doing 
a linear scan over the directory that removes outdated entries over time. This 
scan is performed in very small steps and driven by other operations,  e.g. a 
small fraction of the buckets (maybe just one) is cleaned up as side activity 
for every operation on the map to amortize the cleanup costs. With the linear 
nature,  at least those accesses to the bucket array are also cache conscious. 
In general, outdated entries cound be detected by an attached timestamp 
(introducing more memory overhead per entry), or we could try to correlate 
timeout with the state version that already exists on every entry in this map.

> State API Should Support Data Expiration (State TTL)
> ----------------------------------------------------
>
>                 Key: FLINK-3089
>                 URL: https://issues.apache.org/jira/browse/FLINK-3089
>             Project: Flink
>          Issue Type: New Feature
>          Components: DataStream API, State Backends, Checkpointing
>            Reporter: Niels Basjes
>            Assignee: Bowen Li
>            Priority: Major
>
> In some usecases (webanalytics) there is a need to have a state per visitor 
> on a website (i.e. keyBy(sessionid) ).
> At some point the visitor simply leaves and no longer creates new events (so 
> a special 'end of session' event will not occur).
> The only way to determine that a visitor has left is by choosing a timeout, 
> like "After 30 minutes no events we consider the visitor 'gone'".
> Only after this (chosen) timeout has expired should we discard this state.
> In the Trigger part of Windows we can set a timer and close/discard this kind 
> of information. But that introduces the buffering effect of the window (which 
> in some scenarios is unwanted).
> What I would like is to be able to set a timeout on a specific state which I 
> can update afterwards.
> This makes it possible to create a map function that assigns the right value 
> and that discards the state automatically.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to