yes Stefano is spot on! The state is only restored if a job is restarted
because of abnormal failure. For state that survives stopping/canceling a
job you can look at savepoints:
essentially uses the same mechanisms as the fault-tolerance stuff for state
but makes it explicit and allows restarting from different savepoints.


On Wed, 20 Apr 2016 at 22:43 Stefano Baghino <stefano.bagh...@radicalbit.io>

> Hello again,
> thanks for giving a shot at my advice anyway but Aljoscha is far more
> knowledgeable then me regarding Flink. :)
> I hope I'm not getting mixed up again but I think gracefully canceling
> your job means you lose your job state. Am I right in saying that the state
> is preserved in case of abnormal termination (e.g.: the JobManager crashes)
> or if you explicitly create a savepoint?
> On Wed, Apr 20, 2016 at 10:13 PM, Jack Huang <jackhu...@machinezone.com>
> wrote:
>> @Aljoscha:
>> For this word count example I am using a kafka topic as the input stream.
>> The problem is that when I cancel the task and restart it, the task loses
>> the accumulated word counts so far and start counting from 1 again. Am I
>> missing something basic here?
>> @Stefano:
>> I also tried to implements the Checkpointed interface but had no luck
>> either. Canceling and restarting the task did not restore the states. Here
>> is my class:
>> inStream.flatMap({ _.toLowerCase.split("\\W+") filter { _.nonEmpty } })
>>>   .keyBy({s => s})
>>>   .map(new StatefulCounter)
>> class StatefulCounter extends RichMapFunction[String, (String,Int)] with
>>> Checkpointed[Integer] {
>>>   private var count: Integer = 0
>>>   def map(in: String): (String,Int) = {
>>>     count += 1
>>>     return (in, count)
>>>   }
>>>   def snapshotState(l: Long, l1: Long): Integer = {
>>>     count
>>>   }
>>>   def restoreState(state: Integer) {
>>>     count = state
>>>   }
>>> }
>> Thanks,
>> Jack Huang
>> On Wed, Apr 20, 2016 at 5:36 AM, Stefano Baghino <
>> stefano.bagh...@radicalbit.io> wrote:
>>> My bad, thanks for pointing that out.
>>> On Wed, Apr 20, 2016 at 1:49 PM, Aljoscha Krettek <aljos...@apache.org>
>>> wrote:
>>>> Hi,
>>>> the *withState() family of functions use the Key/Value state interface
>>>> internally, so that should work.
>>>> On Wed, 20 Apr 2016 at 12:33 Stefano Baghino <
>>>> stefano.bagh...@radicalbit.io> wrote:
>>>>> Hi Jack,
>>>>> it seems you correctly enabled the checkpointing by calling
>>>>> `env.enableCheckpointing`. However, your UDFs have to either implement the
>>>>> Checkpointed interface or use the Key/Value State interface to make sure
>>>>> the state of the computation is snapshotted.
>>>>> The documentation explains how to define your functions so that they
>>>>> checkpoint the state far better than I could in this post:
>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/streaming/state.html
>>>>> I hope I've been of some help, I'll gladly help you further if you
>>>>> need it.
>>>>> On Wed, Apr 20, 2016 at 11:34 AM, Aljoscha Krettek <
>>>>> aljos...@apache.org> wrote:
>>>>>> Hi,
>>>>>> what seems to be the problem?
>>>>>> Cheers,
>>>>>> Aljoscha
>>>>>> On Wed, 20 Apr 2016 at 03:52 Jack Huang <jackhu...@machinezone.com>
>>>>>> wrote:
>>>>>>> Hi all,
>>>>>>> I am doing a simple word count example and want to checkpoint the
>>>>>>> accumulated word counts. I am not having any luck getting the counts 
>>>>>>> saved
>>>>>>> and restored. Can someone help?
>>>>>>> env.enableCheckpointing(1000)
>>>>>>> env.setStateBackend(new MemoryStateBackend())
>>>>>>>>  ...
>>>>>>> inStream
>>>>>>>>     .keyBy({s => s})
>>>>>>>> *.mapWithState((in:String, count:Option[Int]) => {        val
>>>>>>>> newCount = count.getOrElse(0) + 1        ((in, newCount), 
>>>>>>>> Some(newCount))
>>>>>>>>   })*
>>>>>>>>     .print()
>>>>>>> Thanks,
>>>>>>> Jack Huang
>>>>> --
>>>>> BR,
>>>>> Stefano Baghino
>>>>> Software Engineer @ Radicalbit
>>> --
>>> BR,
>>> Stefano Baghino
>>> Software Engineer @ Radicalbit
> --
> BR,
> Stefano Baghino
> Software Engineer @ Radicalbit

Reply via email to