Hi Milind, A job can be stopped with a savepoint in the following way [1]: ./bin/flink stop --savepointPath [:targetDirectory] :jobId
Best, Matthias [1] https://ci.apache.org/projects/flink/flink-docs-master/docs/ops/state/savepoints/#stopping-a-job-with-savepoint On Sun, May 16, 2021 at 1:12 AM Milind Vaidya <kava...@gmail.com> wrote: > Sounds good. > How do I achieve stop with savepoint ? > > - Milind > > On Mon, Apr 26, 2021 at 12:55 AM Matthias Pohl <matth...@ververica.com> > wrote: > >> I'm not sure what you're trying to achieve. Are you trying to simulate a >> task failure? Or are you trying to pick up the state from a stopped job? >> You could achieve the former one by killing the TaskManager instance or >> by throwing a custom failure as part of your job pipeline. The latter one >> can be achieved by using stop-with-savepoint instead of canceling the job. >> >> Matthias >> >> On Fri, Apr 23, 2021 at 9:31 PM Milind Vaidya <kava...@gmail.com> wrote: >> >>> Hi Matthias, >>> >>> Yeah you are right. I am canceling the job and hence it is creating new >>> job with new job id and hence it is no respecting previous checkpoint. I >>> observed same behaviour even for local FS backend. >>> >>> Is there any way to simulated failing of job locally ? >>> >>> As far as config is concerned, I have not configured any back end in the >>> conf file and defaulting to Memory Checkpoint. >>> >>> Thanks, >>> Milind >>> >>> >>> >>> On Fri, Apr 23, 2021 at 12:32 AM Matthias Pohl <matth...@ververica.com> >>> wrote: >>> >>>> One additional question: How did you stop and restart the job? The >>>> behavior you're expecting should work with stop-with-savepoint. Cancelling >>>> the job and then just restarting it wouldn't work. The latter approach >>>> would lead to a new job being created. >>>> >>>> Best, >>>> Matthias >>>> >>>> On Thu, Apr 22, 2021 at 3:12 PM Matthias Pohl <matth...@ververica.com> >>>> wrote: >>>> >>>>> Hi Milind, >>>>> I bet someone else might have a faster answer. But could you provide >>>>> the logs and config to get a better understanding of what your issue is? >>>>> In general, the state is maintained even in cases where a TaskManager >>>>> fails. >>>>> >>>>> Best, >>>>> Matthias >>>>> >>>>> On Thu, Apr 22, 2021 at 5:11 AM Milind Vaidya <kava...@gmail.com> >>>>> wrote: >>>>> >>>>>> Hi >>>>>> >>>>>> I see MemoryStateBackend being used in TM Log >>>>>> >>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask - No state >>>>>> backend has been configured, using default (Memory / JobManager) >>>>>> MemoryStateBackend (data in heap memory / checkpoints to JobManager) >>>>>> (checkpoints: 'null', savepoints: 'null', asynchronous: TRUE, >>>>>> maxStateSize: 5242880) >>>>>> >>>>>> >>>>>> >>>>>> I am logging checkpointed value which is just message count >>>>>> >>>>>> Snapshot the state 500 >>>>>> Snapshot the state 1000 >>>>>> >>>>>> >>>>>> When I restart the job i.e. new TM but the job manager is same I see >>>>>> >>>>>> Snapshot the state 500 >>>>>> >>>>>> In the JM logs I see following entries >>>>>> >>>>>> Triggering checkpoint 1 >>>>>> Triggering checkpoint 2 >>>>>> >>>>>> After restarting job hence new TM >>>>>> >>>>>> Triggering checkpoint 1 >>>>>> >>>>>> As per my understanding JM should hold the checkpointed >>>>>> <https://ci.apache.org/projects/flink/flink-docs-release-1.9/ops/state/state_backends.html#the-memorystatebackend> >>>>>> state across TM ? Am I correct? >>>>>> >>>>>> I have not configured anything special and using default. Do I need >>>>>> to add any setting to make it work ? >>>>>> I want to maintain message count across the TMs. >>>>>> >>>>>