gaoyunhaii opened a new pull request #14831:
URL: https://github.com/apache/flink/pull/14831


   ## What is the purpose of the change
   
   This PR makes `CheckpointBarrierHandler` inserts barrier for channels that 
have received EndPartition but not processed it yet. 
   
   When an input channel received EndOfPartition, it would insert a prioritized 
`FinalizeBarrier` at the head of queue, which indicating to 
`CheckpointBarrierHandler` that this channel has received `EndOfPartition` and 
what its next expected barrier id is. 
   
   For `CheckpointBarrierHandler`, it introduce a new component 
`FinalizeBarrierComplementProcessor` to deal with `FinalizedBarrier` and insert 
barriers.
   1.  When start checkpoint due to received barrier from another channel or 
RPC trigger, it would insert barriers for all the input channels received 
EndOfPartition
   2. When received `FinalizeBarrier` from one input channel, it would insert 
barriers for all the pending checkpoints for this channel.
   It would also record the next expected barrier id for each channel and would 
not insert stale barriers.
   
   **LocalInputChannel** 
   
   Previously LocalInputChannel would directly pull buffer from 
`ResultPartition` and would not cache buffers. However, with inserted barriers, 
LocalInputChannel would have to support caching some buffers. We would like to 
use a FIFO queue to cache buffers instead of a prioritized queue to simplify 
the notification to InputGates (e.g., do not need to notifyPrioritizedEvent for 
the `LocalInputChannel` as before).
   
   ## Brief change log
   
   - 7b5bcb2aea349560a6fac8d07e879f6e135f2e67 introduces the `FinalizedBarrier` 
event.
   - 6f78a9a3c18c8ed8c43cfe6213e80c793e84f5b3 InputChannels insert 
`FinalizedBarrier` before EndOfPartition and support inserting barriers before 
EndOfPartition
   - 1240dbb9879bedee17ac69d69ab8fd514589febd Modify the 
`CheckpointBarrierHandler` to support insert barriers when needed and support 
alignment with EndOfPartition.
   
   ## Verifying this change
   
   This change added tests and can be verified via added UT for input channels 
and checkpoint barrier handlers.
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): **no**
     - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: **no**
     - The serializers: **no**
     - The runtime per-record code paths (performance sensitive): **no**
     - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn/Mesos, ZooKeeper: **no**
     - The S3 file system connector: **no**
   
   ## Documentation
   
     - Does this pull request introduce a new feature? **no**
     - If yes, how is the feature documented? **not applicable**
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to