I have implemented a Custom File Source(wrapper around native Flink File Source) that signalsNoSplits using a custom strategy which terminates the Flink App automatically.
Due to external restrictions, I am bound to using Flink 1.13.3 in Streaming mode with Unbounded Input and cannot use the Batch mode OR Streaming with Unbounded input due to the bug[Ref <https://flink.apache.org/2022/07/11/flip-147-support-checkpoints-after-tasks-finished-part-one/> ]. Custom File Source Strategy: The custom File source listens to Checkpoint Start and Complete calls. It waits until FileSplitAssigner.remainingSplits().size() is 0(i.e. the final file split is assigned) after which it additionally waits for N(ex: 4) checkpoint complete notifications(following the Checkpoint Subsuming contract). After the completion of N checkpoints, it signalsNoMoreSplits to the source readers. Other app details: Unaligned checkpoints are disabled Graph: Custom File Sink(S3) -> Process Function -> Native File Sink(S3) Problem: I have noticed that the checkpoints started after the last file split was sent completes even though the downstream operators are still busy processing the records. According to checkpointing documentation[Ref <https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/learn-flink/fault_tolerance/>], the checkpoint barrier event clearly delineates the events read before and after the checkpoint. So the expectation is that the checkpoints started after the last file split was assigned will only complete when all the operators are done processing the records. Due to this issue, there are missing records in the output Additional notes: The custom File source also implements an "Ordered Source", that is read sources in order(ex: Read Seed data first to seed the Job State before processing regular data) using the same strategy described above *Questions* 1. Is there a flaw in my understanding of checkpointing? 2. Are there any other ways to "verify" if the Job is done reading all records and is inactive from the Source or any other operator? Ref: StackOverflow Link <https://stackoverflow.com/questions/77285227/flink-unexpected-checkpoint-behavior> Best, Abhishek