[ https://issues.apache.org/jira/browse/FLINK-4975?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15647917#comment-15647917 ]
ASF GitHub Bot commented on FLINK-4975: --------------------------------------- Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/2754#discussion_r87017313 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java --- @@ -525,81 +527,63 @@ else if (!props.forceCheckpoint()) { } /** - * Receives a {@link DeclineCheckpoint} message and returns whether the - * message was associated with a pending checkpoint. + * Receives a {@link DeclineCheckpoint} message for a pending checkpoint. * * @param message Checkpoint decline from the task manager - * - * @return Flag indicating whether the declined checkpoint was associated - * with a pending checkpoint. */ - public boolean receiveDeclineMessage(DeclineCheckpoint message) throws Exception { + public void receiveDeclineMessage(DeclineCheckpoint message) throws Exception { if (shutdown || message == null) { - return false; + return; } if (!job.equals(message.getJob())) { - LOG.error("Received DeclineCheckpoint message for wrong job: {}", message); - return false; + throw new IllegalArgumentException("Received DeclineCheckpoint message for job " + + message.getJob() + " while this coordinator handles job " + job); } final long checkpointId = message.getCheckpointId(); + final String reason = (message.getReason() != null ? message.getReason().getMessage() : ""); PendingCheckpoint checkpoint; - // Flag indicating whether the ack message was for a known pending - // checkpoint. - boolean isPendingCheckpoint; - synchronized (lock) { // we need to check inside the lock for being shutdown as well, otherwise we // get races and invalid error log messages if (shutdown) { - return false; + return; } checkpoint = pendingCheckpoints.get(checkpointId); if (checkpoint != null && !checkpoint.isDiscarded()) { - isPendingCheckpoint = true; - - LOG.info("Discarding checkpoint " + checkpointId - + " because of checkpoint decline from task " + message.getTaskExecutionId()); + LOG.info("Discarding checkpoint " + checkpointId + " because of checkpoint decline from task " + + message.getTaskExecutionId() + " : " + reason); pendingCheckpoints.remove(checkpointId); checkpoint.abortDeclined(); rememberRecentCheckpointId(checkpointId); - boolean haveMoreRecentPending = false; - - for (PendingCheckpoint p : pendingCheckpoints.values()) { - if (!p.isDiscarded() && p.getCheckpointTimestamp() >= checkpoint.getCheckpointTimestamp()) { - haveMoreRecentPending = true; - break; - } - } - if (!haveMoreRecentPending && !triggerRequestQueued) { - LOG.info("Triggering new checkpoint because of discarded checkpoint " + checkpointId); - triggerCheckpoint(System.currentTimeMillis(), checkpoint.getProps(), checkpoint.getTargetDirectory(), checkpoint.isPeriodic()); - } else if (!haveMoreRecentPending) { - LOG.info("Promoting queued checkpoint request because of discarded checkpoint " + checkpointId); - triggerQueuedRequests(); - } - } else if (checkpoint != null) { + // we don't have to schedule another "dissolving" checkpoint any more because the + // cancellation barriers take care of breaking downstream alignments + // we only need to make sure that suspended queued requests are resumed + triggerQueuedRequests(); --- End diff -- Noticed during backporting: This should only happen if no newer pending checkpoint exists. > Add a limit for how much data may be buffered during checkpoint alignment > ------------------------------------------------------------------------- > > Key: FLINK-4975 > URL: https://issues.apache.org/jira/browse/FLINK-4975 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing > Affects Versions: 1.1.3 > Reporter: Stephan Ewen > Assignee: Stephan Ewen > Fix For: 1.2.0, 1.1.4 > > > During checkpoint alignment, data may be buffered/spilled. > We should introduce an upper limit for the spilled data volume. After > exceeding that limit, the checkpoint alignment should abort and the > checkpoint be canceled. -- This message was sent by Atlassian JIRA (v6.3.4#6332)