Hey Min, If checking for empty map states works for you, this could be an option, yes. Alternatively, check this out:
CheckpointedFunction.initializeState() will pass you a FunctionInitializationContext, which has a method called ".isRestored()" Best, Robert On Thu, Dec 5, 2019 at 10:18 AM <min....@ubs.com> wrote: > Dear Robert, > > > > Thank you very much for sending your reply. > > > > What we try to achieve is that > > 1) In a normal situation, checkpoints or save points are preserved, > an application restarts from one of these paths (with configurations are > kept in Map states). > > 2) Sometimes, e.g. during a version update (with a modified Kafka > topic name), we could be un able to re start the application from one of > these checkpoints or savepoints. We have to re start from scratch. > > > > We like to detect if a job starts from a checkpoint or starts from a > scratch inside the main method. > > Then we can let a broadcast stream reading from the "earliest" or "latest" > of a Kafka topic for configurations. > > > > Or should we just check if the map states are empty insead? > > > > Regards, > > > > Min > > > > *From:* Robert Metzger [mailto:rmetz...@apache.org] > *Sent:* Donnerstag, 5. Dezember 2019 09:47 > *To:* Tan, Min > *Cc:* vino yang; user > *Subject:* [External] Re: Access to CheckpointStatsCounts > > > > Hey Min, > > > > when a Flink job recovers after a failure, the main method is not > re-executed. The main method is only executed for the submission of the > job. The JobManager executing the job is receiving the final job graph. On > failure, the JobManager will restore the job based on the job graph. > > If you start a Flink job from scratch, it will use the current > configuration. On a failure recovery, the Flink job will rely on the > checkpoint. > > > > Please let me know if your problem has been resolved with this > information. If not, I need a bit more information on what you are trying > to achieve, so that I can help you better. > > > > Best, > > Robert > > > > > > On Mon, Dec 2, 2019 at 10:12 AM <min....@ubs.com> wrote: > > Many thanks for sending your reply. > > > > It is not for monitoring but for configuration. > > > > For a job starting from an empty status, we like to load the fresh > configurations. > > For a job recovering from a checkpoint, we like to rely on the checkpoint. > > > > Regards, > > > > Min > > > > *From:* vino yang [mailto:yanghua1...@gmail.com] > *Sent:* Montag, 2. Dezember 2019 10:09 > *To:* Tan, Min > *Cc:* user > *Subject:* [External] Re: Access to CheckpointStatsCounts > > > > Hi min, > > > > If it is only for monitoring purposes, you can just use checkpoint REST > API[1] to do this work. > > > > [1]: > https://ci.apache.org/projects/flink/flink-docs-release-1.9/monitoring/rest_api.html#jobs-jobid-checkpoints > > > > Best, > > Vino > > > > <min....@ubs.com> 于2019年12月2日周一 下午5:01写道: > > Hi, > > > > Just wonder how to access the CheckpointStatsCoutns from the main method > of a job? > > > > We need to detect if a job recovers from a checkpoint or starts from an > empty status. > > > > Regards, > > > > Min > >