Hi Max

Thanks for your answer.
We have some states, on some keys, which we would like to delete after a 
certain time.
And since there is no option at the moment to put an "expiriece" date on it, I 
just use the snapshot function to test and verify if the current key is still 
in some threshold.

So I would like to have an option to perodically check the timestamp, and 
remove old entries from the state.

Therefore I used the KeyedStream to key by a id.
Can I use the internal function to do the snapshoting, but use the snapshot 
function to do some cleanup on the states?


> On 01 Jun 2016, at 16:29, Maximilian Michels <m...@apache.org> wrote:
> Hi Simon,
> You don't need to write any code to checkpoint the Keyed State. It is
> done automatically by Flink. Just remove the `snapshoteState(..)` and
> `restoreState(..)` methods.
> Cheers,
> Max
> On Wed, Jun 1, 2016 at 4:00 PM, simon peyer <simon.pe...@soom-it.ch> wrote:
>> Hi Max
>> I'm using a keyby but would like to store the state.
>> Thus what's the way to go?
>> How do I have to handle the state in option 2).
>> Could you give an example?
>> Thanks
>> --Simon
>>> On 01 Jun 2016, at 15:55, Maximilian Michels <m...@apache.org> wrote:
>>> Hi Simon,
>>> There are two types of state:
>>> 1) Keyed State
>>> The state you access via `getRuntimeContext().getState(..)` is scoped
>>> by key. If no key is in the scope, the key is null and update
>>> operations won't work. Use a `keyBy(..)` before your map function to
>>> partition the state by key. The state is automatically checkpointed by
>>> Flink.
>>> 2) Operator State
>>> This state is kept per parallel instance of the operator. You
>>> implement the Checkpointed interface and use the `snapshotState(..)`
>>> and `restoreState(..)` methods to checkpoint the state.
>>> I think you want to use one of the two. Although it is possible to use
>>> both, it looks like you're confusing the two in your example.
>>> Cheers,
>>> Max
>>> On Wed, Jun 1, 2016 at 3:06 PM, simon peyer <simon.pe...@soom-it.ch> wrote:
>>>> Hi together
>>>> I did implement a little pipeline, which has some statefull computation:
>>>> Conntaing a function which extends RichFlatMapFunction and Checkpointed.
>>>> The state is stored in the field:
>>>> private var state_item: ValueState[Option[Pathsection]] = null
>>>> override def open(conf: Configuration):Unit = {
>>>>   log.info("Open a new Checkpointed FlatMap function with configuration:
>>>> {}", conf)
>>>>   state_item = getRuntimeContext.getState(new ValueStateDescriptor("State
>>>> of Pathsection",
>>>> (Option(Pathsection)).getClass.asInstanceOf[Class[Option[Pathsection]]],
>>>> None))
>>>> }
>>>> override def snapshotState(checkpointId: Long, checkpointTimestamp: Long):
>>>> Option[Pathsection] = {
>>>>   log.debug("Snapshote State with checkpointId: {} at Timestamp {}",
>>>> checkpointId, checkpointTimestamp)
>>>>   removeOldEntries(checkpointTimestamp)
>>>>   state_item.value()
>>>> }
>>>> override def restoreState(state: Option[Pathsection]):Unit = {
>>>>   if (state == null){
>>>>     log.debug("Restore Snapshot: Null")
>>>>   }
>>>>   else if(state == None){
>>>>     log.debug("Restore Snapshot: None")
>>>>   }
>>>>   else if (state_item == null){
>>>>     log.debug("State Item not initialized")
>>>>   }
>>>>   else{
>>>>     state_item.update(state)
>>>>   }
>>>> }
>>>> But when I do run this computation and get the program to fail, I get the
>>>> following error:
>>>> java.lang.Exception: Could not restore checkpointed state to operators and
>>>> functions
>>>> at
>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreState(StreamTask.java:457)
>>>> at
>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:209)
>>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>>>> at java.lang.Thread.run(Thread.java:745)
>>>> Caused by: java.lang.Exception: Failed to restore state to function: No key
>>>> available.
>>>> at
>>>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.restoreState(AbstractUdfStreamOperator.java:168)
>>>> at
>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreState(StreamTask.java:449)
>>>> ... 3 more
>>>> Caused by: java.lang.RuntimeException: No key available.
>>>> at
>>>> org.apache.flink.runtime.state.memory.MemValueState.update(MemValueState.java:69)
>>>> at ....Function which has the Checkpointed thingy
>>>> (CheckpointedIncrAddrPositions.scala:68)
>>>> What am I missing?
>>>> Thanks
>>>> Simon

Reply via email to