I have implemented a Custom File Source(wrapper around native Flink File
Source) that signalsNoSplits using a custom strategy which terminates the
Flink App automatically.

Due to external restrictions, I am bound to using Flink 1.13.3 in Streaming
mode with Unbounded Input and cannot use the Batch mode OR Streaming with
Unbounded input due to the bug[Ref
<https://flink.apache.org/2022/07/11/flip-147-support-checkpoints-after-tasks-finished-part-one/>
].

Custom File Source Strategy: The custom File source listens to Checkpoint
Start and Complete calls. It waits until
FileSplitAssigner.remainingSplits().size() is 0(i.e. the final file split
is assigned) after which it additionally waits for N(ex: 4) checkpoint
complete notifications(following the Checkpoint Subsuming contract). After
the completion of N checkpoints, it signalsNoMoreSplits to the source
readers.

Other app details: Unaligned checkpoints are disabled

Graph: Custom File Sink(S3) -> Process Function -> Native File Sink(S3)

Problem: I have noticed that the checkpoints started after the last file
split was sent completes even though the downstream operators are still
busy processing the records. According to checkpointing documentation[Ref
<https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/learn-flink/fault_tolerance/>],
the checkpoint barrier event clearly delineates the events read before and
after the checkpoint. So the expectation is that the checkpoints started
after the last file split was assigned will only complete when all the
operators are done processing the records. Due to this issue, there are
missing records in the output

Additional notes: The custom File source also implements an "Ordered
Source", that is read sources in order(ex: Read Seed data first to seed the
Job State before processing regular data) using the same strategy described
above

*Questions*
1. Is there a flaw in my understanding of checkpointing?
2. Are there any other ways to "verify" if the Job is done reading all
records and is inactive from the Source or any other operator?


Ref: StackOverflow Link
<https://stackoverflow.com/questions/77285227/flink-unexpected-checkpoint-behavior>

Best,
Abhishek

Reply via email to