Hi,
You can register a processing time timer inside the onTimer and the open
function to have a timer that run periodically.
Pseudo-code example:
|ValueState<Long> lastRuntime; void open() {
ctx.timerService().registerProcessingTimeTimer(current.timestamp +
60000); } void onTimer() { // Run the periodic task if
(lastRuntime.get() + 60000 == timeStamp) { periodicTask(); } //
Re-register the processing time timer timer
lastRuntime.setValue(timeStamp); | ||ctx.timerService().registerProcessingTimeTimer(current.timestamp +
60000);| } void periodicTask() |
For the second question, timer are already scoped by key, so you can
keep a lastModified variable as a ValueState,
then compare it to the timestamp provided by the timer to see if the
current key should be evicted.
Checkout the example on the ProcessFunction page.
https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/stream/process_function.html
Best regards,
Kien
On 9/5/2017 11:49 AM, Navneeth Krishnan wrote:
Hi All,
I have a streaming pipeline which is keyed by userid and then to a
flatmap function. I need to clear the state after sometime and I was
looking at process function for it.
Inside the process element function if I register a timer wouldn't it
create a timer for each incoming message?
|// schedule the next timer 60 seconds from the current event time
ctx.timerService().registerEventTimeTimer(current.timestamp + 60000);|
How can I get something like a clean up task that runs every 2 mins
and evicts all stale data? Also is there a way to get the key inside
onTimer function so that I know which key has to be evicted?
Thanks,
Navneeth