gaoyunhaii opened a new pull request #14831: URL: https://github.com/apache/flink/pull/14831
## What is the purpose of the change This PR makes `CheckpointBarrierHandler` inserts barrier for channels that have received EndPartition but not processed it yet. When an input channel received EndOfPartition, it would insert a prioritized `FinalizeBarrier` at the head of queue, which indicating to `CheckpointBarrierHandler` that this channel has received `EndOfPartition` and what its next expected barrier id is. For `CheckpointBarrierHandler`, it introduce a new component `FinalizeBarrierComplementProcessor` to deal with `FinalizedBarrier` and insert barriers. 1. When start checkpoint due to received barrier from another channel or RPC trigger, it would insert barriers for all the input channels received EndOfPartition 2. When received `FinalizeBarrier` from one input channel, it would insert barriers for all the pending checkpoints for this channel. It would also record the next expected barrier id for each channel and would not insert stale barriers. **LocalInputChannel** Previously LocalInputChannel would directly pull buffer from `ResultPartition` and would not cache buffers. However, with inserted barriers, LocalInputChannel would have to support caching some buffers. We would like to use a FIFO queue to cache buffers instead of a prioritized queue to simplify the notification to InputGates (e.g., do not need to notifyPrioritizedEvent for the `LocalInputChannel` as before). ## Brief change log - 7b5bcb2aea349560a6fac8d07e879f6e135f2e67 introduces the `FinalizedBarrier` event. - 6f78a9a3c18c8ed8c43cfe6213e80c793e84f5b3 InputChannels insert `FinalizedBarrier` before EndOfPartition and support inserting barriers before EndOfPartition - 1240dbb9879bedee17ac69d69ab8fd514589febd Modify the `CheckpointBarrierHandler` to support insert barriers when needed and support alignment with EndOfPartition. ## Verifying this change This change added tests and can be verified via added UT for input channels and checkpoint barrier handlers. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): **no** - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: **no** - The serializers: **no** - The runtime per-record code paths (performance sensitive): **no** - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn/Mesos, ZooKeeper: **no** - The S3 file system connector: **no** ## Documentation - Does this pull request introduce a new feature? **no** - If yes, how is the feature documented? **not applicable** ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org