Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/2754#discussion_r87017313 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java --- @@ -525,81 +527,63 @@ else if (!props.forceCheckpoint()) { } /** - * Receives a {@link DeclineCheckpoint} message and returns whether the - * message was associated with a pending checkpoint. + * Receives a {@link DeclineCheckpoint} message for a pending checkpoint. * * @param message Checkpoint decline from the task manager - * - * @return Flag indicating whether the declined checkpoint was associated - * with a pending checkpoint. */ - public boolean receiveDeclineMessage(DeclineCheckpoint message) throws Exception { + public void receiveDeclineMessage(DeclineCheckpoint message) throws Exception { if (shutdown || message == null) { - return false; + return; } if (!job.equals(message.getJob())) { - LOG.error("Received DeclineCheckpoint message for wrong job: {}", message); - return false; + throw new IllegalArgumentException("Received DeclineCheckpoint message for job " + + message.getJob() + " while this coordinator handles job " + job); } final long checkpointId = message.getCheckpointId(); + final String reason = (message.getReason() != null ? message.getReason().getMessage() : ""); PendingCheckpoint checkpoint; - // Flag indicating whether the ack message was for a known pending - // checkpoint. - boolean isPendingCheckpoint; - synchronized (lock) { // we need to check inside the lock for being shutdown as well, otherwise we // get races and invalid error log messages if (shutdown) { - return false; + return; } checkpoint = pendingCheckpoints.get(checkpointId); if (checkpoint != null && !checkpoint.isDiscarded()) { - isPendingCheckpoint = true; - - LOG.info("Discarding checkpoint " + checkpointId - + " because of checkpoint decline from task " + message.getTaskExecutionId()); + LOG.info("Discarding checkpoint " + checkpointId + " because of checkpoint decline from task " + + message.getTaskExecutionId() + " : " + reason); pendingCheckpoints.remove(checkpointId); checkpoint.abortDeclined(); rememberRecentCheckpointId(checkpointId); - boolean haveMoreRecentPending = false; - - for (PendingCheckpoint p : pendingCheckpoints.values()) { - if (!p.isDiscarded() && p.getCheckpointTimestamp() >= checkpoint.getCheckpointTimestamp()) { - haveMoreRecentPending = true; - break; - } - } - if (!haveMoreRecentPending && !triggerRequestQueued) { - LOG.info("Triggering new checkpoint because of discarded checkpoint " + checkpointId); - triggerCheckpoint(System.currentTimeMillis(), checkpoint.getProps(), checkpoint.getTargetDirectory(), checkpoint.isPeriodic()); - } else if (!haveMoreRecentPending) { - LOG.info("Promoting queued checkpoint request because of discarded checkpoint " + checkpointId); - triggerQueuedRequests(); - } - } else if (checkpoint != null) { + // we don't have to schedule another "dissolving" checkpoint any more because the + // cancellation barriers take care of breaking downstream alignments + // we only need to make sure that suspended queued requests are resumed + triggerQueuedRequests(); --- End diff -- Noticed during backporting: This should only happen if no newer pending checkpoint exists.
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---