GitHub user StephanEwen opened a pull request: https://github.com/apache/flink/pull/2773
[backport] [FLINK-4975] [checkpointing] Add a limit for how much data may be buffered in alignment ## This is a backport of #2754 to the 1.1.x branch. ## This already incorporates the feedback on #2754 ----- In corner case situations, checkpoint alignment can take very long and buffer/spill a lot of data. This PR introduces setting a limit to how much data may be buffered during alignments. If that volume is exceeded, the checkpoint will abort. While these overly large alignment situation should not occur in a healthy environment, it is an important safety net to have. This Pull Request consists of three parts: ### Introduce Cancellation Barriers These *Cancellation Barriers* are like checkpoint barriers, flowing with the data, but signalling that a checkpoint should be aborted rather that the position of that stream in the checkpoint. This adds extensive tests to the `BarrierBuffer` and `BarrierTracker` that these Cancellation Barriers are correctly interpreted and interplay well with other situations of alignment starts and cancellations (such as when newer barriers come early). ### Adjust and Checkpoint Coordinator Tasks emit cancellation barriers whenever they cannot start a checkpoint or whenever a checkpoint alignment was canceled. That lets downstream tasks know earlier that they should stop the alignment for that checkpoint, because it will not be able to complete. Tasks also explicitly send "decline" messages to the checkpoint coordinator for checkpoints they "skipped" due to alignment being cancelled or superseded. Previously the assumptions were: - When a Source Task cannot start a checkpoint, a new checkpoint must be triggered immediately, to dissolve any started downstream alignments that otherwise would not be able to complete. - Whenever an alignment is aborted by a newer checkpoint barrier coming in, that newer barrier will eventually reach the downstream task and break outdated pending alignments. The cancellation barrier will not break the outdated alignment earlier. ### Alignment Size Limit When the `BarrierBuffer` has buffered more than a certain number of bytes, it aborts the alignment and signals the Task that the checkpoint was aborted. The Task sends a cancellation barrier for that checkpoint downstream, to signal the downstream tasks that they should not wait for a proper barrier. The maximum alignment size is a config option: `task.checkpoint.alignment.max-size` You can merge this pull request into a Git repository by running: $ git pull https://github.com/StephanEwen/incubator-flink backport Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2773.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2773 ---- commit a1f028dee49928ada014632bb27216b36e30250e Author: Stephan Ewen <se...@apache.org> Date: 2016-10-23T16:41:32Z [FLINK-4984] [checkpointing] Add Cancellation Barriers as a way to signal aborted checkpoints commit b643edf1ace88b34c9cea5e892c440ad114a46fe Author: Stephan Ewen <se...@apache.org> Date: 2016-11-03T14:28:15Z [FLINK-4975] [checkpointing] Add a limit for how much data may be buffered in alignment. If more data than the defined amount is buffered, the alignment is aborted and the checkpoint canceled. commit 0d890024299aecd3279d9f033415a206622e0425 Author: Stephan Ewen <se...@apache.org> Date: 2016-11-08T16:13:19Z [FLINK-4985] [checkpointing] Report canceled / declined checkpoints to the Checkpoint Coordinator ---- --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---