Hi,
yes Stefano is spot on! The state is only restored if a job is restarted
because of abnormal failure. For state that survives stopping/canceling a
job you can look at savepoints:
https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/savepoints.html
This
essentially uses the same mechanisms as the fault-tolerance stuff for state
but makes it explicit and allows restarting from different savepoints.

Cheers,
Aljoscha

On Wed, 20 Apr 2016 at 22:43 Stefano Baghino <stefano.bagh...@radicalbit.io>
wrote:

> Hello again,
>
> thanks for giving a shot at my advice anyway but Aljoscha is far more
> knowledgeable then me regarding Flink. :)
>
> I hope I'm not getting mixed up again but I think gracefully canceling
> your job means you lose your job state. Am I right in saying that the state
> is preserved in case of abnormal termination (e.g.: the JobManager crashes)
> or if you explicitly create a savepoint?
>
> On Wed, Apr 20, 2016 at 10:13 PM, Jack Huang <jackhu...@machinezone.com>
> wrote:
>
>> @Aljoscha:
>> For this word count example I am using a kafka topic as the input stream.
>> The problem is that when I cancel the task and restart it, the task loses
>> the accumulated word counts so far and start counting from 1 again. Am I
>> missing something basic here?
>>
>> @Stefano:
>> I also tried to implements the Checkpointed interface but had no luck
>> either. Canceling and restarting the task did not restore the states. Here
>> is my class:
>>
>> inStream.flatMap({ _.toLowerCase.split("\\W+") filter { _.nonEmpty } })
>>>   .keyBy({s => s})
>>>   .map(new StatefulCounter)
>>
>>
>> class StatefulCounter extends RichMapFunction[String, (String,Int)] with
>>> Checkpointed[Integer] {
>>>   private var count: Integer = 0
>>>
>>>   def map(in: String): (String,Int) = {
>>>     count += 1
>>>     return (in, count)
>>>   }
>>>   def snapshotState(l: Long, l1: Long): Integer = {
>>>     count
>>>   }
>>>   def restoreState(state: Integer) {
>>>     count = state
>>>   }
>>> }
>>
>>
>>
>> Thanks,
>>
>>
>> Jack Huang
>>
>> On Wed, Apr 20, 2016 at 5:36 AM, Stefano Baghino <
>> stefano.bagh...@radicalbit.io> wrote:
>>
>>> My bad, thanks for pointing that out.
>>>
>>> On Wed, Apr 20, 2016 at 1:49 PM, Aljoscha Krettek <aljos...@apache.org>
>>> wrote:
>>>
>>>> Hi,
>>>> the *withState() family of functions use the Key/Value state interface
>>>> internally, so that should work.
>>>>
>>>> On Wed, 20 Apr 2016 at 12:33 Stefano Baghino <
>>>> stefano.bagh...@radicalbit.io> wrote:
>>>>
>>>>> Hi Jack,
>>>>>
>>>>> it seems you correctly enabled the checkpointing by calling
>>>>> `env.enableCheckpointing`. However, your UDFs have to either implement the
>>>>> Checkpointed interface or use the Key/Value State interface to make sure
>>>>> the state of the computation is snapshotted.
>>>>>
>>>>> The documentation explains how to define your functions so that they
>>>>> checkpoint the state far better than I could in this post:
>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/streaming/state.html
>>>>>
>>>>> I hope I've been of some help, I'll gladly help you further if you
>>>>> need it.
>>>>>
>>>>> On Wed, Apr 20, 2016 at 11:34 AM, Aljoscha Krettek <
>>>>> aljos...@apache.org> wrote:
>>>>>
>>>>>> Hi,
>>>>>> what seems to be the problem?
>>>>>>
>>>>>> Cheers,
>>>>>> Aljoscha
>>>>>>
>>>>>> On Wed, 20 Apr 2016 at 03:52 Jack Huang <jackhu...@machinezone.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi all,
>>>>>>>
>>>>>>> I am doing a simple word count example and want to checkpoint the
>>>>>>> accumulated word counts. I am not having any luck getting the counts 
>>>>>>> saved
>>>>>>> and restored. Can someone help?
>>>>>>>
>>>>>>> env.enableCheckpointing(1000)
>>>>>>>
>>>>>>> env.setStateBackend(new MemoryStateBackend())
>>>>>>>
>>>>>>>
>>>>>>>>  ...
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> inStream
>>>>>>>>     .keyBy({s => s})
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> *.mapWithState((in:String, count:Option[Int]) => {        val
>>>>>>>> newCount = count.getOrElse(0) + 1        ((in, newCount), 
>>>>>>>> Some(newCount))
>>>>>>>>   })*
>>>>>>>>     .print()
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> Thanks,
>>>>>>>
>>>>>>> Jack Huang
>>>>>>>
>>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> BR,
>>>>> Stefano Baghino
>>>>>
>>>>> Software Engineer @ Radicalbit
>>>>>
>>>>
>>>
>>>
>>> --
>>> BR,
>>> Stefano Baghino
>>>
>>> Software Engineer @ Radicalbit
>>>
>>
>>
>
>
> --
> BR,
> Stefano Baghino
>
> Software Engineer @ Radicalbit
>

Reply via email to