GitHub user StephanEwen opened a pull request:

    https://github.com/apache/flink/pull/2773

    [backport] [FLINK-4975] [checkpointing] Add a limit for how much data may 
be buffered in alignment

    ## This is a backport of #2754 to the 1.1.x branch.
    ## This already incorporates the feedback on #2754
    
    -----
    
    In corner case situations, checkpoint alignment can take very long and 
buffer/spill a lot of data. This PR introduces setting a limit to how much data 
may be buffered during alignments. If that volume is exceeded, the checkpoint 
will abort.
    
    While these overly large alignment situation should not occur in a healthy 
environment, it is an important safety net to have.
    
    This Pull Request consists of three parts:
    
    ### Introduce Cancellation Barriers
    
    These *Cancellation Barriers* are like checkpoint barriers, flowing with 
the data, but signalling that a checkpoint should be aborted rather that the 
position of that stream in the checkpoint.
    
    This adds extensive tests to the `BarrierBuffer` and `BarrierTracker` that 
these Cancellation Barriers are correctly interpreted and interplay well with 
other situations of alignment starts and cancellations (such as when newer 
barriers come early).
    
    ### Adjust and Checkpoint Coordinator
    
    Tasks emit cancellation barriers whenever they cannot start a checkpoint or 
whenever a checkpoint alignment was canceled. That lets downstream tasks know 
earlier that they should stop the alignment for that checkpoint, because it 
will not be able to complete.
    
    Tasks also explicitly send "decline" messages to the checkpoint coordinator 
for checkpoints they "skipped" due to alignment being cancelled or superseded.
    
    Previously the assumptions were:
      - When a Source Task cannot start a checkpoint, a new checkpoint must be 
triggered immediately, to dissolve any started downstream alignments that 
otherwise would not be able to complete. 
      - Whenever an alignment is aborted by a newer checkpoint barrier coming 
in, that newer barrier will eventually reach the downstream task and break 
outdated pending alignments. The cancellation barrier will not break the 
outdated alignment earlier.
    
    ### Alignment Size Limit
    
    When the `BarrierBuffer` has buffered more than a certain number of bytes, 
it aborts the alignment and signals the Task that the checkpoint was aborted. 
The Task sends a cancellation barrier for that checkpoint downstream, to signal 
the downstream tasks that they should not wait for a proper barrier.
    
    The maximum alignment size is a config option: 
`task.checkpoint.alignment.max-size`

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/StephanEwen/incubator-flink backport

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/2773.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #2773
    
----
commit a1f028dee49928ada014632bb27216b36e30250e
Author: Stephan Ewen <se...@apache.org>
Date:   2016-10-23T16:41:32Z

    [FLINK-4984] [checkpointing] Add Cancellation Barriers as a way to signal 
aborted checkpoints

commit b643edf1ace88b34c9cea5e892c440ad114a46fe
Author: Stephan Ewen <se...@apache.org>
Date:   2016-11-03T14:28:15Z

    [FLINK-4975] [checkpointing] Add a limit for how much data may be buffered 
in alignment.
    
    If more data than the defined amount is buffered, the alignment is aborted 
and the checkpoint canceled.

commit 0d890024299aecd3279d9f033415a206622e0425
Author: Stephan Ewen <se...@apache.org>
Date:   2016-11-08T16:13:19Z

    [FLINK-4985] [checkpointing] Report canceled / declined checkpoints to the 
Checkpoint Coordinator

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to