Hi Milind,
A job can be stopped with a savepoint in the following way [1]: ./bin/flink
stop --savepointPath [:targetDirectory] :jobId

Best,
Matthias

[1]
https://ci.apache.org/projects/flink/flink-docs-master/docs/ops/state/savepoints/#stopping-a-job-with-savepoint

On Sun, May 16, 2021 at 1:12 AM Milind Vaidya <kava...@gmail.com> wrote:

> Sounds good.
> How do I achieve stop with savepoint ?
>
> - Milind
>
> On Mon, Apr 26, 2021 at 12:55 AM Matthias Pohl <matth...@ververica.com>
> wrote:
>
>> I'm not sure what you're trying to achieve. Are you trying to simulate a
>> task failure? Or are you trying to pick up the state from a stopped job?
>> You could achieve the former one by killing the TaskManager instance or
>> by throwing a custom failure as part of your job pipeline. The latter one
>> can be achieved by using stop-with-savepoint instead of canceling the job.
>>
>> Matthias
>>
>> On Fri, Apr 23, 2021 at 9:31 PM Milind Vaidya <kava...@gmail.com> wrote:
>>
>>> Hi Matthias,
>>>
>>> Yeah you are right. I am canceling the job and hence it is creating new
>>> job with new job id and hence it is no respecting previous checkpoint. I
>>> observed same behaviour even for local FS backend.
>>>
>>> Is there any way to simulated failing of job locally ?
>>>
>>> As far as config is concerned, I have not configured any back end in the
>>> conf file and defaulting to Memory Checkpoint.
>>>
>>> Thanks,
>>> Milind
>>>
>>>
>>>
>>> On Fri, Apr 23, 2021 at 12:32 AM Matthias Pohl <matth...@ververica.com>
>>> wrote:
>>>
>>>> One additional question: How did you stop and restart the job? The
>>>> behavior you're expecting should work with stop-with-savepoint. Cancelling
>>>> the job and then just restarting it wouldn't work. The latter approach
>>>> would lead to a new job being created.
>>>>
>>>> Best,
>>>> Matthias
>>>>
>>>> On Thu, Apr 22, 2021 at 3:12 PM Matthias Pohl <matth...@ververica.com>
>>>> wrote:
>>>>
>>>>> Hi Milind,
>>>>> I bet someone else might have a faster answer. But could you provide
>>>>> the logs and config to get a better understanding of what your issue is?
>>>>> In general, the state is maintained even in cases where a TaskManager
>>>>> fails.
>>>>>
>>>>> Best,
>>>>> Matthias
>>>>>
>>>>> On Thu, Apr 22, 2021 at 5:11 AM Milind Vaidya <kava...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi
>>>>>>
>>>>>> I see MemoryStateBackend being used in TM Log
>>>>>>
>>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask - No state
>>>>>> backend has been configured, using default (Memory / JobManager)
>>>>>> MemoryStateBackend (data in heap memory / checkpoints to JobManager)
>>>>>> (checkpoints: 'null', savepoints: 'null', asynchronous: TRUE,
>>>>>> maxStateSize: 5242880)
>>>>>>
>>>>>>
>>>>>>
>>>>>> I am logging checkpointed value which is just message count
>>>>>>
>>>>>> Snapshot the state 500
>>>>>> Snapshot the state 1000
>>>>>>
>>>>>>
>>>>>> When I restart the job i.e. new TM but the job manager is same I see
>>>>>>
>>>>>> Snapshot the state 500
>>>>>>
>>>>>> In the JM logs I see following entries
>>>>>>
>>>>>> Triggering checkpoint 1
>>>>>> Triggering checkpoint 2
>>>>>>
>>>>>> After restarting job hence new TM
>>>>>>
>>>>>> Triggering checkpoint 1
>>>>>>
>>>>>> As per my understanding JM should hold the checkpointed
>>>>>> <https://ci.apache.org/projects/flink/flink-docs-release-1.9/ops/state/state_backends.html#the-memorystatebackend>
>>>>>> state across TM ? Am I correct?
>>>>>>
>>>>>> I have not configured anything special and using default. Do I need
>>>>>> to add any setting to make it work ?
>>>>>> I want to maintain message count across the TMs.
>>>>>>
>>>>>

Reply via email to