@Stefano, Aljoscha:

Thank you for pointing that out. With the following steps I verified that
the state of the job gets restored

   1. Use HDFS as state backend with env.setStateBackend(new
   FsStateBackend("hdfs:///home/user/flink/KafkaWordCount"))
   2. Start the job. In my case the job ID is
   e4b5316ae4ea0c8ed6fab4fa238b4b2f
   3. Observe that
   hdfs:///home/user/flink/KafkaWordCount/e4b5316ae4ea0c8ed6fab4fa238b4b2f
   is created
   4. Kill all TaskManager, but leave job manager running
   5. Restart all TaskManager with bin/start-cluster.sh
   6. Observe that the job manager automatically restarts the job under the
   same job ID
   7. Observe from the output that the states are restored


Jack



Jack Huang

On Thu, Apr 21, 2016 at 1:40 AM, Aljoscha Krettek <aljos...@apache.org>
wrote:

> Hi,
> yes Stefano is spot on! The state is only restored if a job is restarted
> because of abnormal failure. For state that survives stopping/canceling a
> job you can look at savepoints:
> https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/savepoints.html
>  This
> essentially uses the same mechanisms as the fault-tolerance stuff for state
> but makes it explicit and allows restarting from different savepoints.
>
> Cheers,
> Aljoscha
>
> On Wed, 20 Apr 2016 at 22:43 Stefano Baghino <
> stefano.bagh...@radicalbit.io> wrote:
>
>> Hello again,
>>
>> thanks for giving a shot at my advice anyway but Aljoscha is far more
>> knowledgeable then me regarding Flink. :)
>>
>> I hope I'm not getting mixed up again but I think gracefully canceling
>> your job means you lose your job state. Am I right in saying that the state
>> is preserved in case of abnormal termination (e.g.: the JobManager crashes)
>> or if you explicitly create a savepoint?
>>
>> On Wed, Apr 20, 2016 at 10:13 PM, Jack Huang <jackhu...@machinezone.com>
>> wrote:
>>
>>> @Aljoscha:
>>> For this word count example I am using a kafka topic as the input
>>> stream. The problem is that when I cancel the task and restart it, the task
>>> loses the accumulated word counts so far and start counting from 1 again.
>>> Am I missing something basic here?
>>>
>>> @Stefano:
>>> I also tried to implements the Checkpointed interface but had no luck
>>> either. Canceling and restarting the task did not restore the states. Here
>>> is my class:
>>>
>>> inStream.flatMap({ _.toLowerCase.split("\\W+") filter { _.nonEmpty } })
>>>>   .keyBy({s => s})
>>>>   .map(new StatefulCounter)
>>>
>>>
>>> class StatefulCounter extends RichMapFunction[String, (String,Int)] with
>>>> Checkpointed[Integer] {
>>>>   private var count: Integer = 0
>>>>
>>>>   def map(in: String): (String,Int) = {
>>>>     count += 1
>>>>     return (in, count)
>>>>   }
>>>>   def snapshotState(l: Long, l1: Long): Integer = {
>>>>     count
>>>>   }
>>>>   def restoreState(state: Integer) {
>>>>     count = state
>>>>   }
>>>> }
>>>
>>>
>>>
>>> Thanks,
>>>
>>>
>>> Jack Huang
>>>
>>> On Wed, Apr 20, 2016 at 5:36 AM, Stefano Baghino <
>>> stefano.bagh...@radicalbit.io> wrote:
>>>
>>>> My bad, thanks for pointing that out.
>>>>
>>>> On Wed, Apr 20, 2016 at 1:49 PM, Aljoscha Krettek <aljos...@apache.org>
>>>> wrote:
>>>>
>>>>> Hi,
>>>>> the *withState() family of functions use the Key/Value state interface
>>>>> internally, so that should work.
>>>>>
>>>>> On Wed, 20 Apr 2016 at 12:33 Stefano Baghino <
>>>>> stefano.bagh...@radicalbit.io> wrote:
>>>>>
>>>>>> Hi Jack,
>>>>>>
>>>>>> it seems you correctly enabled the checkpointing by calling
>>>>>> `env.enableCheckpointing`. However, your UDFs have to either implement 
>>>>>> the
>>>>>> Checkpointed interface or use the Key/Value State interface to make sure
>>>>>> the state of the computation is snapshotted.
>>>>>>
>>>>>> The documentation explains how to define your functions so that they
>>>>>> checkpoint the state far better than I could in this post:
>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/streaming/state.html
>>>>>>
>>>>>> I hope I've been of some help, I'll gladly help you further if you
>>>>>> need it.
>>>>>>
>>>>>> On Wed, Apr 20, 2016 at 11:34 AM, Aljoscha Krettek <
>>>>>> aljos...@apache.org> wrote:
>>>>>>
>>>>>>> Hi,
>>>>>>> what seems to be the problem?
>>>>>>>
>>>>>>> Cheers,
>>>>>>> Aljoscha
>>>>>>>
>>>>>>> On Wed, 20 Apr 2016 at 03:52 Jack Huang <jackhu...@machinezone.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi all,
>>>>>>>>
>>>>>>>> I am doing a simple word count example and want to checkpoint the
>>>>>>>> accumulated word counts. I am not having any luck getting the counts 
>>>>>>>> saved
>>>>>>>> and restored. Can someone help?
>>>>>>>>
>>>>>>>> env.enableCheckpointing(1000)
>>>>>>>>
>>>>>>>> env.setStateBackend(new MemoryStateBackend())
>>>>>>>>
>>>>>>>>
>>>>>>>>>  ...
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> inStream
>>>>>>>>>     .keyBy({s => s})
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> *.mapWithState((in:String, count:Option[Int]) => {        val
>>>>>>>>> newCount = count.getOrElse(0) + 1        ((in, newCount), 
>>>>>>>>> Some(newCount))
>>>>>>>>>   })*
>>>>>>>>>     .print()
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> Thanks,
>>>>>>>>
>>>>>>>> Jack Huang
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>> BR,
>>>>>> Stefano Baghino
>>>>>>
>>>>>> Software Engineer @ Radicalbit
>>>>>>
>>>>>
>>>>
>>>>
>>>> --
>>>> BR,
>>>> Stefano Baghino
>>>>
>>>> Software Engineer @ Radicalbit
>>>>
>>>
>>>
>>
>>
>> --
>> BR,
>> Stefano Baghino
>>
>> Software Engineer @ Radicalbit
>>
>

Reply via email to