Sounds good. How do I achieve stop with savepoint ? - Milind
On Mon, Apr 26, 2021 at 12:55 AM Matthias Pohl <matth...@ververica.com> wrote: > I'm not sure what you're trying to achieve. Are you trying to simulate a > task failure? Or are you trying to pick up the state from a stopped job? > You could achieve the former one by killing the TaskManager instance or by > throwing a custom failure as part of your job pipeline. The latter one can > be achieved by using stop-with-savepoint instead of canceling the job. > > Matthias > > On Fri, Apr 23, 2021 at 9:31 PM Milind Vaidya <kava...@gmail.com> wrote: > >> Hi Matthias, >> >> Yeah you are right. I am canceling the job and hence it is creating new >> job with new job id and hence it is no respecting previous checkpoint. I >> observed same behaviour even for local FS backend. >> >> Is there any way to simulated failing of job locally ? >> >> As far as config is concerned, I have not configured any back end in the >> conf file and defaulting to Memory Checkpoint. >> >> Thanks, >> Milind >> >> >> >> On Fri, Apr 23, 2021 at 12:32 AM Matthias Pohl <matth...@ververica.com> >> wrote: >> >>> One additional question: How did you stop and restart the job? The >>> behavior you're expecting should work with stop-with-savepoint. Cancelling >>> the job and then just restarting it wouldn't work. The latter approach >>> would lead to a new job being created. >>> >>> Best, >>> Matthias >>> >>> On Thu, Apr 22, 2021 at 3:12 PM Matthias Pohl <matth...@ververica.com> >>> wrote: >>> >>>> Hi Milind, >>>> I bet someone else might have a faster answer. But could you provide >>>> the logs and config to get a better understanding of what your issue is? >>>> In general, the state is maintained even in cases where a TaskManager >>>> fails. >>>> >>>> Best, >>>> Matthias >>>> >>>> On Thu, Apr 22, 2021 at 5:11 AM Milind Vaidya <kava...@gmail.com> >>>> wrote: >>>> >>>>> Hi >>>>> >>>>> I see MemoryStateBackend being used in TM Log >>>>> >>>>> org.apache.flink.streaming.runtime.tasks.StreamTask - No state >>>>> backend has been configured, using default (Memory / JobManager) >>>>> MemoryStateBackend (data in heap memory / checkpoints to JobManager) >>>>> (checkpoints: 'null', savepoints: 'null', asynchronous: TRUE, >>>>> maxStateSize: 5242880) >>>>> >>>>> >>>>> >>>>> I am logging checkpointed value which is just message count >>>>> >>>>> Snapshot the state 500 >>>>> Snapshot the state 1000 >>>>> >>>>> >>>>> When I restart the job i.e. new TM but the job manager is same I see >>>>> >>>>> Snapshot the state 500 >>>>> >>>>> In the JM logs I see following entries >>>>> >>>>> Triggering checkpoint 1 >>>>> Triggering checkpoint 2 >>>>> >>>>> After restarting job hence new TM >>>>> >>>>> Triggering checkpoint 1 >>>>> >>>>> As per my understanding JM should hold the checkpointed >>>>> <https://ci.apache.org/projects/flink/flink-docs-release-1.9/ops/state/state_backends.html#the-memorystatebackend> >>>>> state across TM ? Am I correct? >>>>> >>>>> I have not configured anything special and using default. Do I need to >>>>> add any setting to make it work ? >>>>> I want to maintain message count across the TMs. >>>>> >>>>