Hi Harsh,

The job id would be fixed as 00000000000000000000000000000000 if using HA mode 
with native k8s, which means the checkpoint path should stay the same no matter 
how many times you submit.
However, if HA mode is enabled, the new job would first recover from the HA 
checkpoint store to recover the last checkpoint. In other words, your new job 
should recover from last checkpoint-1. From your exceptions, we can see the job 
did not recover successfully and start the job from scratch. That's why you 
could meet the exception that checkpoint-meta file has been existed.

There would be two reasons for this:

  1.  The HA checkpoint store did not recover successfully, you could check 
whether the checkpoint 1 is completed in the previous run.
  2.  The last checkpoint-1 finished to store on the remote checkpoint path but 
fail to add to the checkpoint store. However, the checkpoint coordinator would 
clean up the checkpoint meta if failed to add to checkpoint store [1] unless 
your job crashed or meet the PossibleInconsistentStateException [2].

I think you should check the jobmanager log of your last run to know the root 
cause.

[1] 
https://github.com/apache/flink/blob/46bf6d68ee97684949ba3ad38dc18ff7c800092a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java#L1233
[2] 
https://github.com/apache/flink/blob/46bf6d68ee97684949ba3ad38dc18ff7c800092a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java#L1226


Best
Yun Tang
________________________________
From: Manong Karl <abc549...@gmail.com>
Sent: Wednesday, August 4, 2021 9:17
To: Harsh Shah <harsh.a.s...@shopify.com>
Cc: user@flink.apache.org <user@flink.apache.org>
Subject: Re: Flink k8 HA mode + checkpoint management

Can You please share your configs? I'm using native kubernetes without HA and 
there's no issues. I'm curious how this happens.  AFAIK jobid is generated 
randomly.


Harsh Shah <harsh.a.s...@shopify.com<mailto:harsh.a.s...@shopify.com>> 
于2021年8月4日周三 上午2:44写道:
Hello,

I am trying to use Flink HA mode inside 
kubernetes<https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/ha/kubernetes_ha/>
 in 
standalone<https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/resource-providers/standalone/overview/#application-mode>
 mode. The Job ID is always constant, "00000000000000000000000000000000". In 
situation where we restart the job (Not from a check-point or savepoint), we 
see errors like
"""

Caused by: org.apache.hadoop.fs.FileAlreadyExistsException: 
'<PATH>/flink-checkpoints/00000000000000000000000000000000/chk-1/_metadata' 
already exists

"""
where checkpoints have not been created since the restart of Job .

My question:
* Is the recommended way to set a new unique "checkpoint path" every time we 
update Job and restart necessary k8 resources (say not restarted from 
checkpoint-savepoint)? Or GC checkpoints during deletion and reload from 
savepoint if required? Looking for a standard recommendation.
* Is there a way I can override the JobID to be unique and indicate it is a 
complete restart in HA mode?


Thanks,
Harsh

Reply via email to