[
https://issues.apache.org/jira/browse/FLINK-5158?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15708911#comment-15708911
]
ASF GitHub Bot commented on FLINK-5158:
---------------------------------------
Github user uce commented on a diff in the pull request:
https://github.com/apache/flink/pull/2873#discussion_r90258796
--- Diff:
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
---
@@ -731,46 +700,100 @@ public boolean
receiveAcknowledgeMessage(AcknowledgeCheckpoint message) throws E
discardState(message.getState());
}
+
+ return true;
}
else if (checkpoint != null) {
// this should not happen
throw new IllegalStateException(
"Received message for discarded
but non-removed checkpoint " + checkpointId);
}
else {
+ boolean wasPendingCheckpoint;
+
// message is for an unknown checkpoint, or
comes too late (checkpoint disposed)
if
(recentPendingCheckpoints.contains(checkpointId)) {
- isPendingCheckpoint = true;
+ wasPendingCheckpoint = true;
LOG.warn("Received late message for now
expired checkpoint attempt {}.", checkpointId);
}
else {
LOG.debug("Received message for an
unknown checkpoint {}.", checkpointId);
- isPendingCheckpoint = false;
+ wasPendingCheckpoint = false;
}
// try to discard the state so that we don't
have lingering state lying around
discardState(message.getState());
+
+ return wasPendingCheckpoint;
+ }
+ }
+ }
+
+ private void completePendingCheckpoint(PendingCheckpoint
pendingCheckpoint) throws CheckpointException {
+ final long checkpointId = pendingCheckpoint.getCheckpointId();
+ CompletedCheckpoint completedCheckpoint = null;
+
+ try {
+ completedCheckpoint =
pendingCheckpoint.finalizeCheckpoint();
+
+
completedCheckpointStore.addCheckpoint(completedCheckpoint);
+
+ rememberRecentCheckpointId(checkpointId);
+
dropSubsumedCheckpoints(completedCheckpoint.getTimestamp());
+
+ onFullyAcknowledgedCheckpoint(completedCheckpoint);
+ } catch (Exception exception) {
+ // abort the current pending checkpoint if it has not
been discarded yet
+ if(!pendingCheckpoint.isDiscarded()) {
+ pendingCheckpoint.discard(userClassLoader);
+ }
+
+ if (completedCheckpoint != null) {
+ // we failed to store the completed checkpoint.
Let's clean up
+ final CompletedCheckpoint cc =
completedCheckpoint;
+
+ executor.execute(new Runnable() {
+ @Override
+ public void run() {
+ try {
+
cc.discard(userClassLoader);
+ } catch (Exception
nestedException) {
+ LOG.warn("Could not
properly discard completed checkpoint {}.", cc.getCheckpointID(),
nestedException);
+ }
+ }
+ });
}
+
+ throw new CheckpointException("Could not complete the
pending checkpoint " + checkpointId + '.', exception);
+ } finally {
+ pendingCheckpoints.remove(checkpointId);
+
+ triggerQueuedRequests();
+ }
+
+ LOG.info("Completed checkpoint {} (in {} ms).", checkpointId,
completedCheckpoint.getDuration());
+
+ if (LOG.isDebugEnabled()) {
--- End diff --
While rebasing you have to make sure to copy the updated string builder here
> Handle ZooKeeperCompletedCheckpointStore exceptions in CheckpointCoordinator
> ----------------------------------------------------------------------------
>
> Key: FLINK-5158
> URL: https://issues.apache.org/jira/browse/FLINK-5158
> Project: Flink
> Issue Type: Bug
> Components: State Backends, Checkpointing
> Affects Versions: 1.2.0, 1.1.3
> Reporter: Till Rohrmann
> Assignee: Till Rohrmann
> Fix For: 1.2.0, 1.1.4
>
>
> The checkpoint coordinator does not properly handle exceptions when trying to
> store completed checkpoints. As a result, completed checkpoints are not
> properly cleaned up and even worse, the {{CheckpointCoordinator}} might get
> stuck stopping triggering checkpoints.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)