[ https://issues.apache.org/jira/browse/FLINK-4975?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15648108#comment-15648108 ]
ASF GitHub Bot commented on FLINK-4975: --------------------------------------- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2754#discussion_r87034115 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java --- @@ -525,81 +527,63 @@ else if (!props.forceCheckpoint()) { } /** - * Receives a {@link DeclineCheckpoint} message and returns whether the - * message was associated with a pending checkpoint. + * Receives a {@link DeclineCheckpoint} message for a pending checkpoint. * * @param message Checkpoint decline from the task manager - * - * @return Flag indicating whether the declined checkpoint was associated - * with a pending checkpoint. */ - public boolean receiveDeclineMessage(DeclineCheckpoint message) throws Exception { + public void receiveDeclineMessage(DeclineCheckpoint message) throws Exception { if (shutdown || message == null) { - return false; + return; } if (!job.equals(message.getJob())) { - LOG.error("Received DeclineCheckpoint message for wrong job: {}", message); - return false; + throw new IllegalArgumentException("Received DeclineCheckpoint message for job " + + message.getJob() + " while this coordinator handles job " + job); } final long checkpointId = message.getCheckpointId(); + final String reason = (message.getReason() != null ? message.getReason().getMessage() : ""); PendingCheckpoint checkpoint; - // Flag indicating whether the ack message was for a known pending - // checkpoint. - boolean isPendingCheckpoint; - synchronized (lock) { // we need to check inside the lock for being shutdown as well, otherwise we // get races and invalid error log messages if (shutdown) { - return false; + return; } checkpoint = pendingCheckpoints.get(checkpointId); if (checkpoint != null && !checkpoint.isDiscarded()) { - isPendingCheckpoint = true; - - LOG.info("Discarding checkpoint " + checkpointId - + " because of checkpoint decline from task " + message.getTaskExecutionId()); + LOG.info("Discarding checkpoint " + checkpointId + " because of checkpoint decline from task " + --- End diff -- Parameterized logging statement using `{}` would be better. > Add a limit for how much data may be buffered during checkpoint alignment > ------------------------------------------------------------------------- > > Key: FLINK-4975 > URL: https://issues.apache.org/jira/browse/FLINK-4975 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing > Affects Versions: 1.1.3 > Reporter: Stephan Ewen > Assignee: Stephan Ewen > Fix For: 1.2.0, 1.1.4 > > > During checkpoint alignment, data may be buffered/spilled. > We should introduce an upper limit for the spilled data volume. After > exceeding that limit, the checkpoint alignment should abort and the > checkpoint be canceled. -- This message was sent by Atlassian JIRA (v6.3.4#6332)