Hi Team,

We have recently enabled Check Pointing in our flink job using
FSStateBackend pointing to S3 bucket.

Below is the sample code for enabling check pointing though app code and we
are using flink version - 1.12.2 .

env.setStateBackend(new
FsStateBackend("s3://flinkcheckpointing/job-name/",true));
env.enableCheckpointing(1000);
Class<?> unmodColl =
Class.forName("java.util.Collections$UnmodifiableCollection");
env.getConfig().addDefaultKryoSerializer(unmodColl,
UnmodifiableCollectionsSerializer.class);
CheckpointConfig config = env.getCheckpointConfig();
config.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
config.setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE);

The query is , do we still need to set the below config in flink-conf.yaml
for checkpointing to work.

*state.checkpoints.dir: s3://prod-flink-checkpointing/checkpoint-metadata/*


Thanks,
Sudhansu

Reply via email to