[ 
https://issues.apache.org/jira/browse/FLINK-4975?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15647917#comment-15647917
 ] 

ASF GitHub Bot commented on FLINK-4975:
---------------------------------------

Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2754#discussion_r87017313
  
    --- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
 ---
    @@ -525,81 +527,63 @@ else if (!props.forceCheckpoint()) {
        }
     
        /**
    -    * Receives a {@link DeclineCheckpoint} message and returns whether the
    -    * message was associated with a pending checkpoint.
    +    * Receives a {@link DeclineCheckpoint} message for a pending 
checkpoint.
         *
         * @param message Checkpoint decline from the task manager
    -    *
    -    * @return Flag indicating whether the declined checkpoint was 
associated
    -    * with a pending checkpoint.
         */
    -   public boolean receiveDeclineMessage(DeclineCheckpoint message) throws 
Exception {
    +   public void receiveDeclineMessage(DeclineCheckpoint message) throws 
Exception {
                if (shutdown || message == null) {
    -                   return false;
    +                   return;
                }
                if (!job.equals(message.getJob())) {
    -                   LOG.error("Received DeclineCheckpoint message for wrong 
job: {}", message);
    -                   return false;
    +                   throw new IllegalArgumentException("Received 
DeclineCheckpoint message for job " +
    +                           message.getJob() + " while this coordinator 
handles job " + job);
                }
     
                final long checkpointId = message.getCheckpointId();
    +           final String reason = (message.getReason() != null ? 
message.getReason().getMessage() : "");
     
                PendingCheckpoint checkpoint;
     
    -           // Flag indicating whether the ack message was for a known 
pending
    -           // checkpoint.
    -           boolean isPendingCheckpoint;
    -
                synchronized (lock) {
                        // we need to check inside the lock for being shutdown 
as well, otherwise we
                        // get races and invalid error log messages
                        if (shutdown) {
    -                           return false;
    +                           return;
                        }
     
                        checkpoint = pendingCheckpoints.get(checkpointId);
     
                        if (checkpoint != null && !checkpoint.isDiscarded()) {
    -                           isPendingCheckpoint = true;
    -
    -                           LOG.info("Discarding checkpoint " + checkpointId
    -                                           + " because of checkpoint 
decline from task " + message.getTaskExecutionId());
    +                           LOG.info("Discarding checkpoint " + 
checkpointId + " because of checkpoint decline from task " + 
    +                                           message.getTaskExecutionId() + 
" : " + reason);
     
                                pendingCheckpoints.remove(checkpointId);
                                checkpoint.abortDeclined();
                                rememberRecentCheckpointId(checkpointId);
     
    -                           boolean haveMoreRecentPending = false;
    -
    -                           for (PendingCheckpoint p : 
pendingCheckpoints.values()) {
    -                                   if (!p.isDiscarded() && 
p.getCheckpointTimestamp() >= checkpoint.getCheckpointTimestamp()) {
    -                                           haveMoreRecentPending = true;
    -                                           break;
    -                                   }
    -                           }
    -                           if (!haveMoreRecentPending && 
!triggerRequestQueued) {
    -                                   LOG.info("Triggering new checkpoint 
because of discarded checkpoint " + checkpointId);
    -                                   
triggerCheckpoint(System.currentTimeMillis(), checkpoint.getProps(), 
checkpoint.getTargetDirectory(), checkpoint.isPeriodic());
    -                           } else if (!haveMoreRecentPending) {
    -                                   LOG.info("Promoting queued checkpoint 
request because of discarded checkpoint " + checkpointId);
    -                                   triggerQueuedRequests();
    -                           }
    -                   } else if (checkpoint != null) {
    +                           // we don't have to schedule another 
"dissolving" checkpoint any more because the
    +                           // cancellation barriers take care of breaking 
downstream alignments
    +                           // we only need to make sure that suspended 
queued requests are resumed
    +                           triggerQueuedRequests();
    --- End diff --
    
    Noticed during backporting: This should only happen if no newer pending 
checkpoint exists.


> Add a limit for how much data may be buffered during checkpoint alignment
> -------------------------------------------------------------------------
>
>                 Key: FLINK-4975
>                 URL: https://issues.apache.org/jira/browse/FLINK-4975
>             Project: Flink
>          Issue Type: Improvement
>          Components: State Backends, Checkpointing
>    Affects Versions: 1.1.3
>            Reporter: Stephan Ewen
>            Assignee: Stephan Ewen
>             Fix For: 1.2.0, 1.1.4
>
>
> During checkpoint alignment, data may be buffered/spilled.
> We should introduce an upper limit for the spilled data volume. After 
> exceeding that limit, the checkpoint alignment should abort and the 
> checkpoint be canceled.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to