Hi Johannes,

you can find the implementation for the state clean up here:
https://github.com/apache/flink/blob/master/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcessFunctionWithCleanupState.scala

and a example usage here:
https://github.com/apache/flink/blob/master/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeUnboundedOver.scala

Regards,
Timo


Am 06.09.17 um 10:50 schrieb Aljoscha Krettek:
Hi,

I'm actually not very familiar with the current Table API implementations but Fabian or Timo (cc'ed) should know more. I suspect very much that this is implemented like this, yes.

Best,
Aljoscha

On 5. Sep 2017, at 21:14, Johannes Schulte <johannes.schu...@gmail.com <mailto:johannes.schu...@gmail.com>> wrote:

Hi,

one short question I had that fits here. When using higher level streaming we can set min and max retention time [1] which is probably used to reduce the number of timers registered under the hood. How is this implemented, by registering a "clamped" timer?

Thanks,

Johannes

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/table/streaming.html#idle-state-retention-time

On Tue, Sep 5, 2017 at 5:17 PM, Aljoscha Krettek <aljos...@apache.org <mailto:aljos...@apache.org>> wrote:

    Hi,

    This is mostly correct, but you cannot register a timer in open()
    because we don't have an active key there. Only in process() and
    onTimer() can you register a timer.

    In your case, I would suggest to somehow clamp the timestamp to
    the nearest 2 minute (or whatever) interval or to keep an extra
    ValueState that tells you whether you already registered a timer.

    Best,
    Aljoscha

    On 5. Sep 2017, at 16:55, Kien Truong <duckientru...@gmail.com
    <mailto:duckientru...@gmail.com>> wrote:

    Hi,

    You can register a processing time timer inside the onTimer and
    the open function to have a timer that run periodically.

    Pseudo-code example:

    |ValueState<Long> lastRuntime; void open() {
    ctx.timerService().registerProcessingTimeTimer(current.timestamp
    + 60000); } void onTimer() { // Run the periodic task if
    (lastRuntime.get() + 60000 == timeStamp) { periodicTask(); } //
    Re-register the processing time timer timer
    lastRuntime.setValue(timeStamp); |   
||ctx.timerService().registerProcessingTimeTimer(current.timestamp
    + 60000);| } void periodicTask() |

    For the second question, timer are already scoped by key, so you
    can keep a lastModified variable as a ValueState,
    then compare it to the timestamp provided by the timer to see if
    the current key should be evicted.
    Checkout the example on the ProcessFunction page.

    
https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/stream/process_function.html
    
<https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/stream/process_function.html>

    Best regards,
    Kien

    On 9/5/2017 11:49 AM, Navneeth Krishnan wrote:
    Hi All,

    I have a streaming pipeline which is keyed by userid and then
    to a flatmap function. I need to clear the state after sometime
    and I was looking at process function for it.

    Inside the process element function if I register a timer
    wouldn't it create a timer for each incoming message?
    |// schedule the next timer 60 seconds from the current event
    time
    ctx.timerService().registerEventTimeTimer(current.timestamp +
    60000);|
    How can I get something like a clean up task that runs every 2
    mins and evicts all stale data? Also is there a way to get the
    key inside onTimer function so that I know which key has to be
    evicted?

    Thanks,
    Navneeth




Reply via email to