I'm not sure what you're trying to achieve. Are you trying to simulate a task failure? Or are you trying to pick up the state from a stopped job? You could achieve the former one by killing the TaskManager instance or by throwing a custom failure as part of your job pipeline. The latter one can be achieved by using stop-with-savepoint instead of canceling the job.
Matthias On Fri, Apr 23, 2021 at 9:31 PM Milind Vaidya <kava...@gmail.com> wrote: > Hi Matthias, > > Yeah you are right. I am canceling the job and hence it is creating new > job with new job id and hence it is no respecting previous checkpoint. I > observed same behaviour even for local FS backend. > > Is there any way to simulated failing of job locally ? > > As far as config is concerned, I have not configured any back end in the > conf file and defaulting to Memory Checkpoint. > > Thanks, > Milind > > > > On Fri, Apr 23, 2021 at 12:32 AM Matthias Pohl <matth...@ververica.com> > wrote: > >> One additional question: How did you stop and restart the job? The >> behavior you're expecting should work with stop-with-savepoint. Cancelling >> the job and then just restarting it wouldn't work. The latter approach >> would lead to a new job being created. >> >> Best, >> Matthias >> >> On Thu, Apr 22, 2021 at 3:12 PM Matthias Pohl <matth...@ververica.com> >> wrote: >> >>> Hi Milind, >>> I bet someone else might have a faster answer. But could you provide the >>> logs and config to get a better understanding of what your issue is? >>> In general, the state is maintained even in cases where a TaskManager >>> fails. >>> >>> Best, >>> Matthias >>> >>> On Thu, Apr 22, 2021 at 5:11 AM Milind Vaidya <kava...@gmail.com> wrote: >>> >>>> Hi >>>> >>>> I see MemoryStateBackend being used in TM Log >>>> >>>> org.apache.flink.streaming.runtime.tasks.StreamTask - No state backend >>>> has been configured, using default (Memory / JobManager) >>>> MemoryStateBackend (data in heap memory / checkpoints to JobManager) >>>> (checkpoints: 'null', savepoints: 'null', asynchronous: TRUE, >>>> maxStateSize: 5242880) >>>> >>>> >>>> >>>> I am logging checkpointed value which is just message count >>>> >>>> Snapshot the state 500 >>>> Snapshot the state 1000 >>>> >>>> >>>> When I restart the job i.e. new TM but the job manager is same I see >>>> >>>> Snapshot the state 500 >>>> >>>> In the JM logs I see following entries >>>> >>>> Triggering checkpoint 1 >>>> Triggering checkpoint 2 >>>> >>>> After restarting job hence new TM >>>> >>>> Triggering checkpoint 1 >>>> >>>> As per my understanding JM should hold the checkpointed >>>> <https://ci.apache.org/projects/flink/flink-docs-release-1.9/ops/state/state_backends.html#the-memorystatebackend> >>>> state across TM ? Am I correct? >>>> >>>> I have not configured anything special and using default. Do I need to >>>> add any setting to make it work ? >>>> I want to maintain message count across the TMs. >>>> >>>