Sounds good.
How do I achieve stop with savepoint ?

- Milind

On Mon, Apr 26, 2021 at 12:55 AM Matthias Pohl <matth...@ververica.com>
wrote:

> I'm not sure what you're trying to achieve. Are you trying to simulate a
> task failure? Or are you trying to pick up the state from a stopped job?
> You could achieve the former one by killing the TaskManager instance or by
> throwing a custom failure as part of your job pipeline. The latter one can
> be achieved by using stop-with-savepoint instead of canceling the job.
>
> Matthias
>
> On Fri, Apr 23, 2021 at 9:31 PM Milind Vaidya <kava...@gmail.com> wrote:
>
>> Hi Matthias,
>>
>> Yeah you are right. I am canceling the job and hence it is creating new
>> job with new job id and hence it is no respecting previous checkpoint. I
>> observed same behaviour even for local FS backend.
>>
>> Is there any way to simulated failing of job locally ?
>>
>> As far as config is concerned, I have not configured any back end in the
>> conf file and defaulting to Memory Checkpoint.
>>
>> Thanks,
>> Milind
>>
>>
>>
>> On Fri, Apr 23, 2021 at 12:32 AM Matthias Pohl <matth...@ververica.com>
>> wrote:
>>
>>> One additional question: How did you stop and restart the job? The
>>> behavior you're expecting should work with stop-with-savepoint. Cancelling
>>> the job and then just restarting it wouldn't work. The latter approach
>>> would lead to a new job being created.
>>>
>>> Best,
>>> Matthias
>>>
>>> On Thu, Apr 22, 2021 at 3:12 PM Matthias Pohl <matth...@ververica.com>
>>> wrote:
>>>
>>>> Hi Milind,
>>>> I bet someone else might have a faster answer. But could you provide
>>>> the logs and config to get a better understanding of what your issue is?
>>>> In general, the state is maintained even in cases where a TaskManager
>>>> fails.
>>>>
>>>> Best,
>>>> Matthias
>>>>
>>>> On Thu, Apr 22, 2021 at 5:11 AM Milind Vaidya <kava...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi
>>>>>
>>>>> I see MemoryStateBackend being used in TM Log
>>>>>
>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask - No state
>>>>> backend has been configured, using default (Memory / JobManager)
>>>>> MemoryStateBackend (data in heap memory / checkpoints to JobManager)
>>>>> (checkpoints: 'null', savepoints: 'null', asynchronous: TRUE,
>>>>> maxStateSize: 5242880)
>>>>>
>>>>>
>>>>>
>>>>> I am logging checkpointed value which is just message count
>>>>>
>>>>> Snapshot the state 500
>>>>> Snapshot the state 1000
>>>>>
>>>>>
>>>>> When I restart the job i.e. new TM but the job manager is same I see
>>>>>
>>>>> Snapshot the state 500
>>>>>
>>>>> In the JM logs I see following entries
>>>>>
>>>>> Triggering checkpoint 1
>>>>> Triggering checkpoint 2
>>>>>
>>>>> After restarting job hence new TM
>>>>>
>>>>> Triggering checkpoint 1
>>>>>
>>>>> As per my understanding JM should hold the checkpointed
>>>>> <https://ci.apache.org/projects/flink/flink-docs-release-1.9/ops/state/state_backends.html#the-memorystatebackend>
>>>>> state across TM ? Am I correct?
>>>>>
>>>>> I have not configured anything special and using default. Do I need to
>>>>> add any setting to make it work ?
>>>>> I want to maintain message count across the TMs.
>>>>>
>>>>

Reply via email to