Thanks, that helped to see how we could implement this! On Wed, Sep 6, 2017 at 12:01 PM, Timo Walther <twal...@apache.org> wrote:
> Hi Johannes, > > you can find the implementation for the state clean up here: > https://github.com/apache/flink/blob/master/flink- > libraries/flink-table/src/main/scala/org/apache/flink/ > table/runtime/aggregate/ProcessFunctionWithCleanupState.scala > > and a example usage here: > https://github.com/apache/flink/blob/master/flink- > libraries/flink-table/src/main/scala/org/apache/flink/ > table/runtime/aggregate/ProcTimeUnboundedOver.scala > > Regards, > Timo > > > Am 06.09.17 um 10:50 schrieb Aljoscha Krettek: > > Hi, > > I'm actually not very familiar with the current Table API implementations > but Fabian or Timo (cc'ed) should know more. I suspect very much that this > is implemented like this, yes. > > Best, > Aljoscha > > On 5. Sep 2017, at 21:14, Johannes Schulte <johannes.schu...@gmail.com> > wrote: > > Hi, > > one short question I had that fits here. When using higher level streaming > we can set min and max retention time [1] which is probably used to reduce > the number of timers registered under the hood. How is this implemented, by > registering a "clamped" timer? > > Thanks, > > Johannes > > [1] https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/table/ > streaming.html#idle-state-retention-time > > On Tue, Sep 5, 2017 at 5:17 PM, Aljoscha Krettek <aljos...@apache.org> > wrote: > >> Hi, >> >> This is mostly correct, but you cannot register a timer in open() because >> we don't have an active key there. Only in process() and onTimer() can you >> register a timer. >> >> In your case, I would suggest to somehow clamp the timestamp to the >> nearest 2 minute (or whatever) interval or to keep an extra ValueState that >> tells you whether you already registered a timer. >> >> Best, >> Aljoscha >> >> On 5. Sep 2017, at 16:55, Kien Truong <duckientru...@gmail.com> wrote: >> >> Hi, >> >> You can register a processing time timer inside the onTimer and the open >> function to have a timer that run periodically. >> >> Pseudo-code example: >> >> ValueState<Long> lastRuntime; >> >> void open() { >> ctx.timerService().registerProcessingTimeTimer(current.timestamp + 60000); >> } >> >> void onTimer() { >> // Run the periodic task >> if (lastRuntime.get() + 60000 == timeStamp) { >> periodicTask(); >> } >> // Re-register the processing time timer timer >> lastRuntime.setValue(timeStamp); >> ctx.timerService().registerProcessingTimeTimer(current.timestamp + 60000); >> } >> >> void periodicTask() >> >> >> For the second question, timer are already scoped by key, so you can keep >> a lastModified variable as a ValueState, >> then compare it to the timestamp provided by the timer to see if the >> current key should be evicted. >> Checkout the example on the ProcessFunction page. >> >> https://ci.apache.org/projects/flink/flink-docs-release-1.2/ >> dev/stream/process_function.html >> >> Best regards, >> Kien >> >> On 9/5/2017 11:49 AM, Navneeth Krishnan wrote: >> >> Hi All, >> >> I have a streaming pipeline which is keyed by userid and then to a >> flatmap function. I need to clear the state after sometime and I was >> looking at process function for it. >> >> Inside the process element function if I register a timer wouldn't it >> create a timer for each incoming message? >> >> // schedule the next timer 60 seconds from the current event time >> ctx.timerService().registerEventTimeTimer(current.timestamp + 60000); >> >> How can I get something like a clean up task that runs every 2 mins and >> evicts all stale data? Also is there a way to get the key inside onTimer >> function so that I know which key has to be evicted? >> >> Thanks, >> Navneeth >> >> >> > > >