Hi Team, We are trying to make sure we are not losing data when KINESIS Consumer is down.
Kinesis streaming Job which has following checkpointing properties: *// checkpoint every X msecs env.enableCheckpointing(Conf.getFlinkCheckpointInterval());* *// enable externalized checkpoints which are retained after job cancellation env.getCheckpointConfig().enableExternalizedCheckpoints( CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION );// allow job recovery fallback to checkpoint when there is a more recent savepoint env.getCheckpointConfig().setPreferCheckpointForRecovery(Conf.isFlinkCheckpointPreferCheckPoint()); // enables the experimental unaligned checkpoints env.getCheckpointConfig().enableUnalignedCheckpoints();* *//checkpointpath* * env.setStateBackend(new FsStateBackend(Conf.getFlinkCheckPointPath(), true));* 1) We killed the Kinesis Job 2) Sent messages to KDS while Consumer was down. 3) Restarted Flink Consumer, *messages which were sent during the Consumer down period, never ingested (data loss).* 4) Re-sent messages to KDS while the consumer was still up. Messages did ingest fine. *How can I avoid data loss for #3 ??* >From Logs: *2021-04-07 12:15:49,161 INFO org.apache.flink.runtime.jobmaster.JobMaster - Using application-defined state backend: File State Backend (checkpoints: 's3://bucket-xx/flink/checkpoint/iv', savepoints: 'null', asynchronous: TRUE, fileStateThreshold: -1)* *2021-04-07 12:16:02,343 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed checkpoint 1 for job 8943d16e22b8aaf65d6b9e2b8bd54113 (3564 bytes in 591 ms).2021-04-07 12:16:11,951 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering checkpoint 2 (type=CHECKPOINT) @ 1617797771751 for job 8943d16e22b8aaf65d6b9e2b8bd54113.2021-04-07 12:16:12,483 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed checkpoint 2 for job 8943d16e22b8aaf65d6b9e2b8bd54113 (3564 bytes in 411 ms).* Thanks, Vijay