Hi Can process function[1] can meet your needs here?, you can do the TTL logic using timers in process functions.
[1] https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/stream/operators/process_function.html Best, Congxian Timo Walther <twal...@apache.org> 于2020年6月10日周三 下午9:36写道: > Hi Annemarie, > > if TTL is what you are looking for and queryable state is what limits > you, it might make sense to come up with a custom implementation of > queryable state? TTL might be more difficult to implement. As far as I > know this feature is more of an experimental feature without any > consistency guarantees. A Function could offer this functionality using > some socket/web service library. Or you offer insights through a side > output into a sink such as Elasticsearch. > > Otherwise, it might be useful to "batch" the cleanups. In Flink's SQL > engine, a user can define a minimum and maximum retention time. So > timers are always set based on the maximum retention time but during > cleanup the elements that fall into the minimum retention time are also > cleaned up on the way (see [1]). This could be a performance improvement. > > If the clean up happens based on event-time, it is also possible to use > timers more efficiently and only set one timer per watermark [2]. > > I hope this helps. > > Regards, > Timo > > [1] > > https://github.com/apache/flink/blob/master/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/join/BaseTwoInputStreamOperatorWithStateRetention.scala > > [2] > > https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/stream/operators/process_function.html#timer-coalescing > > > On 09.06.20 16:29, Annemarie Burger wrote: > > Hi, > > > > What I'm trying to do is the following: I want to incrementally add and > > delete elements to a state. If the element expires/goes out of the > window, > > it needs to be removed from the state. I basically want the > functionality of > > TTL, without using it, since I'm also using Queryable State and these two > > features can't be combined. Ofcourse I can give a "valid untill" time to > > each element when I'm adding it to the state using a ProcessFunction, and > > periodically iterate over the state to remove expired elements, but I was > > wondering if there is a more efficient way. For example to use a timer, > > which we give the element as a parameter, so that when the timer fires, x > > seconds after the timer was set, it can just look up the element directly > > and remove it. But how would I implement this? > > > > Thanks! > > > > > > > > -- > > Sent from: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ > > > >