I'm not sure what you're trying to achieve. Are you trying to simulate a
task failure? Or are you trying to pick up the state from a stopped job?
You could achieve the former one by killing the TaskManager instance or by
throwing a custom failure as part of your job pipeline. The latter one can
be achieved by using stop-with-savepoint instead of canceling the job.

Matthias

On Fri, Apr 23, 2021 at 9:31 PM Milind Vaidya <kava...@gmail.com> wrote:

> Hi Matthias,
>
> Yeah you are right. I am canceling the job and hence it is creating new
> job with new job id and hence it is no respecting previous checkpoint. I
> observed same behaviour even for local FS backend.
>
> Is there any way to simulated failing of job locally ?
>
> As far as config is concerned, I have not configured any back end in the
> conf file and defaulting to Memory Checkpoint.
>
> Thanks,
> Milind
>
>
>
> On Fri, Apr 23, 2021 at 12:32 AM Matthias Pohl <matth...@ververica.com>
> wrote:
>
>> One additional question: How did you stop and restart the job? The
>> behavior you're expecting should work with stop-with-savepoint. Cancelling
>> the job and then just restarting it wouldn't work. The latter approach
>> would lead to a new job being created.
>>
>> Best,
>> Matthias
>>
>> On Thu, Apr 22, 2021 at 3:12 PM Matthias Pohl <matth...@ververica.com>
>> wrote:
>>
>>> Hi Milind,
>>> I bet someone else might have a faster answer. But could you provide the
>>> logs and config to get a better understanding of what your issue is?
>>> In general, the state is maintained even in cases where a TaskManager
>>> fails.
>>>
>>> Best,
>>> Matthias
>>>
>>> On Thu, Apr 22, 2021 at 5:11 AM Milind Vaidya <kava...@gmail.com> wrote:
>>>
>>>> Hi
>>>>
>>>> I see MemoryStateBackend being used in TM Log
>>>>
>>>> org.apache.flink.streaming.runtime.tasks.StreamTask - No state backend
>>>> has been configured, using default (Memory / JobManager)
>>>> MemoryStateBackend (data in heap memory / checkpoints to JobManager)
>>>> (checkpoints: 'null', savepoints: 'null', asynchronous: TRUE,
>>>> maxStateSize: 5242880)
>>>>
>>>>
>>>>
>>>> I am logging checkpointed value which is just message count
>>>>
>>>> Snapshot the state 500
>>>> Snapshot the state 1000
>>>>
>>>>
>>>> When I restart the job i.e. new TM but the job manager is same I see
>>>>
>>>> Snapshot the state 500
>>>>
>>>> In the JM logs I see following entries
>>>>
>>>> Triggering checkpoint 1
>>>> Triggering checkpoint 2
>>>>
>>>> After restarting job hence new TM
>>>>
>>>> Triggering checkpoint 1
>>>>
>>>> As per my understanding JM should hold the checkpointed
>>>> <https://ci.apache.org/projects/flink/flink-docs-release-1.9/ops/state/state_backends.html#the-memorystatebackend>
>>>> state across TM ? Am I correct?
>>>>
>>>> I have not configured anything special and using default. Do I need to
>>>> add any setting to make it work ?
>>>> I want to maintain message count across the TMs.
>>>>
>>>

Reply via email to