Repository: flink
Updated Branches:
  refs/heads/release-1.2 1ca57368a -> cc1ed221f


[FLINK-6328] [chkPts] Don't add savepoints to CompletedCheckpointStore

The lifecycle of savepoints is not managed by the CheckpointCoordinator and 
fully
in the hand of the user. Therefore, the CheckpointCoordinator cannot rely on 
them
when trying to recover from failures. E.g. a user moving a savepoint shortly 
before
a failure could completely break Flink's recovery mechanism because Flink cannot
skip failed checkpoints when recovering.

Therefore, until Flink is able to skip failed checkpoints when recovering, we 
should
not add savepoints to the CompletedCheckpointStore which is used to retrieve 
checkpoint
for recovery. The distinction of a savepoint is done on the basis of the
CheckpointProperties (CheckpointProperties.STANDARD_SAVEPOINT).


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/cc1ed221
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/cc1ed221
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/cc1ed221

Branch: refs/heads/release-1.2
Commit: cc1ed221fd3ab5b535983a4c1c94bbdb93d71309
Parents: 1ca5736
Author: Till Rohrmann <[email protected]>
Authored: Mon May 22 17:41:14 2017 +0200
Committer: Till Rohrmann <[email protected]>
Committed: Tue May 23 10:51:18 2017 +0200

----------------------------------------------------------------------
 .../checkpoint/CheckpointCoordinator.java       |  69 +++++-----
 .../checkpoint/CheckpointProperties.java        |   1 -
 .../checkpoint/CheckpointCoordinatorTest.java   | 130 +++++++++++++++++--
 3 files changed, 158 insertions(+), 42 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/cc1ed221/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index cb8417a..36feea8 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -38,6 +38,7 @@ import org.apache.flink.runtime.state.StateObject;
 import org.apache.flink.runtime.state.TaskStateHandles;
 import org.apache.flink.runtime.taskmanager.DispatcherThreadFactory;
 
+import org.apache.flink.util.Preconditions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -739,44 +740,54 @@ public class CheckpointCoordinator {
         */
        private void completePendingCheckpoint(PendingCheckpoint 
pendingCheckpoint) throws CheckpointException {
                final long checkpointId = pendingCheckpoint.getCheckpointId();
-               CompletedCheckpoint completedCheckpoint = null;
-
+               final CompletedCheckpoint completedCheckpoint;
+               
                try {
-                       completedCheckpoint = 
pendingCheckpoint.finalizeCheckpoint();
-
-                       
completedCheckpointStore.addCheckpoint(completedCheckpoint);
-
-                       rememberRecentCheckpointId(checkpointId);
-                       dropSubsumedCheckpoints(checkpointId);
-               } catch (Exception exception) {
-                       // abort the current pending checkpoint if it has not 
been discarded yet
-                       if (!pendingCheckpoint.isDiscarded()) {
-                               pendingCheckpoint.abortError(exception);
+                       try {
+                               completedCheckpoint = 
pendingCheckpoint.finalizeCheckpoint();
+                       } catch (Exception e1) {
+                               // abort the current pending checkpoint if we 
fails to finalize the pending checkpoint.
+                               if (!pendingCheckpoint.isDiscarded()) {
+                                       pendingCheckpoint.abortError(e1);
+                               }
+       
+                               throw new CheckpointException("Could not 
finalize the pending checkpoint " + checkpointId + '.', e1);
                        }
-
-                       if (completedCheckpoint != null) {
-                               // we failed to store the completed checkpoint. 
Let's clean up
-                               final CompletedCheckpoint cc = 
completedCheckpoint;
-
-                               executor.execute(new Runnable() {
-                                       @Override
-                                       public void run() {
-                                               try {
-                                                       cc.discard();
-                                               } catch (Exception 
nestedException) {
-                                                       LOG.warn("Could not 
properly discard completed checkpoint {}.", cc.getCheckpointID(), 
nestedException);
+               
+                       // the pending checkpoint must be discarded after the 
finalization
+                       
Preconditions.checkState(pendingCheckpoint.isDiscarded() && completedCheckpoint 
!= null);
+
+                       // TODO: add savepoints to completed checkpoint store 
once FLINK-4815 has been completed
+                       if 
(!CheckpointProperties.isSavepoint(completedCheckpoint.getProperties())) {
+                               try {
+                                       
completedCheckpointStore.addCheckpoint(completedCheckpoint);
+                               } catch (Exception exception) {
+                                       // we failed to store the completed 
checkpoint. Let's clean up
+                                       executor.execute(new Runnable() {
+                                               @Override
+                                               public void run() {
+                                                       try {
+                                                               
completedCheckpoint.discard();
+                                                       } catch (Exception 
nestedException) {
+                                                               LOG.warn("Could 
not properly discard completed checkpoint {}.", 
completedCheckpoint.getCheckpointID(), nestedException);
+                                                       }
                                                }
-                                       }
-                               });
-                       }
+                                       });
+
+                                       throw new CheckpointException("Could 
not complete the pending checkpoint " + checkpointId + '.', exception);
+                               }
 
-                       throw new CheckpointException("Could not complete the 
pending checkpoint " + checkpointId + '.', exception);
+                               // drop those pending checkpoints that are at 
prior to the completed one
+                               dropSubsumedCheckpoints(checkpointId);
+                       }
                } finally {
                        pendingCheckpoints.remove(checkpointId);
 
                        triggerQueuedRequests();
                }
-               
+
+               rememberRecentCheckpointId(checkpointId);
+
                // record the time when this was completed, to calculate
                // the 'min delay between checkpoints'
                lastCheckpointCompletionNanos = System.nanoTime();

http://git-wip-us.apache.org/repos/asf/flink/blob/cc1ed221/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointProperties.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointProperties.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointProperties.java
index 4d8bab2..5dadb5d 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointProperties.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointProperties.java
@@ -305,7 +305,6 @@ public class CheckpointProperties implements Serializable {
                        return EXTERNALIZED_CHECKPOINT_RETAINED;
                }
        }
-
        /**
         * Returns whether the checkpoint properties describe a standard 
savepoint.
         *

http://git-wip-us.apache.org/repos/asf/flink/blob/cc1ed221/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
index eeab445..8b79423 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
@@ -50,6 +50,7 @@ import org.apache.flink.runtime.testutils.CommonTestUtils;
 import org.apache.flink.runtime.util.TestByteStreamStateHandleDeepCompare;
 import org.apache.flink.util.InstantiationUtil;
 import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.TestLogger;
 import org.junit.Assert;
 import org.junit.Rule;
 import org.junit.Test;
@@ -96,7 +97,7 @@ import static org.mockito.Mockito.when;
 /**
  * Tests for the checkpoint coordinator.
  */
-public class CheckpointCoordinatorTest {
+public class CheckpointCoordinatorTest extends TestLogger {
 
        @Rule
        public TemporaryFolder tmpFolder = new TemporaryFolder();
@@ -1392,8 +1393,8 @@ public class CheckpointCoordinatorTest {
                assertTrue(pending.isDiscarded());
                assertTrue(savepointFuture.isDone());
 
-               // the now we should have a completed checkpoint
-               assertEquals(1, 
coord.getNumberOfRetainedSuccessfulCheckpoints());
+               // the now the saveppoint should be completed but not added to 
the completed checkpoint store
+               assertEquals(0, 
coord.getNumberOfRetainedSuccessfulCheckpoints());
                assertEquals(0, coord.getNumberOfPendingCheckpoints());
 
                // validate that the relevant tasks got a confirmation message
@@ -1402,7 +1403,7 @@ public class CheckpointCoordinatorTest {
                        verify(vertex2.getCurrentExecutionAttempt(), 
times(1)).notifyCheckpointComplete(eq(checkpointId), eq(timestamp));
                }
 
-               CompletedCheckpoint success = 
coord.getSuccessfulCheckpoints().get(0);
+               CompletedCheckpoint success = savepointFuture.get();
                assertEquals(jid, success.getJobId());
                assertEquals(timestamp, success.getTimestamp());
                assertEquals(pending.getCheckpointId(), 
success.getCheckpointID());
@@ -1421,9 +1422,9 @@ public class CheckpointCoordinatorTest {
                coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, 
attemptID2, checkpointMetaDataNew));
 
                assertEquals(0, coord.getNumberOfPendingCheckpoints());
-               assertEquals(1, 
coord.getNumberOfRetainedSuccessfulCheckpoints());
+               assertEquals(0, 
coord.getNumberOfRetainedSuccessfulCheckpoints());
 
-               CompletedCheckpoint successNew = 
coord.getSuccessfulCheckpoints().get(0);
+               CompletedCheckpoint successNew = savepointFuture.get();
                assertEquals(jid, successNew.getJobId());
                assertEquals(timestampNew, successNew.getTimestamp());
                assertEquals(checkpointIdNew, successNew.getCheckpointID());
@@ -1446,7 +1447,7 @@ public class CheckpointCoordinatorTest {
         * Triggers a savepoint and two checkpoints. The second checkpoint 
completes
         * and subsumes the first checkpoint, but not the first savepoint. Then 
we
         * trigger another checkpoint and savepoint. The 2nd savepoint 
completes and
-        * subsumes the last checkpoint, but not the first savepoint.
+        * does neither subsume the last checkpoint nor the first savepoint.
         */
        @Test
        public void testSavepointsAreNotSubsumed() throws Exception {
@@ -1505,6 +1506,8 @@ public class CheckpointCoordinatorTest {
                assertFalse(savepointFuture1.isDone());
 
                assertTrue(coord.triggerCheckpoint(timestamp + 3, false));
+               long checkpointId3 = counter.getLast();
+               CheckpointMetaData checkpointMetaData3 = new 
CheckpointMetaData(checkpointId3, 0L);
                assertEquals(2, coord.getNumberOfPendingCheckpoints());
 
                Future<CompletedCheckpoint> savepointFuture2 = 
coord.triggerSavepoint(timestamp + 4, savepointDir);
@@ -1512,12 +1515,12 @@ public class CheckpointCoordinatorTest {
                CheckpointMetaData checkpointMetaDataS2 = new 
CheckpointMetaData(savepointId2, 0L);
                assertEquals(3, coord.getNumberOfPendingCheckpoints());
 
-               // 2nd savepoint should subsume the last checkpoint, but not 
the 1st savepoint
+               // 2nd savepoint should not subsume the last checkpoint and the 
1st savepoint
                coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, 
attemptID1, checkpointMetaDataS2));
                coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, 
attemptID2, checkpointMetaDataS2));
 
-               assertEquals(1, coord.getNumberOfPendingCheckpoints());
-               assertEquals(2, 
coord.getNumberOfRetainedSuccessfulCheckpoints());
+               assertEquals(2, coord.getNumberOfPendingCheckpoints());
+               assertEquals(1, 
coord.getNumberOfRetainedSuccessfulCheckpoints());
                
assertFalse(coord.getPendingCheckpoints().get(savepointId1).isDiscarded());
 
                assertFalse(savepointFuture1.isDone());
@@ -1527,9 +1530,15 @@ public class CheckpointCoordinatorTest {
                coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, 
attemptID1, checkpointMetaDataS1));
                coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, 
attemptID2, checkpointMetaDataS1));
 
-               assertEquals(0, coord.getNumberOfPendingCheckpoints());
-               assertEquals(3, 
coord.getNumberOfRetainedSuccessfulCheckpoints());
+               assertEquals(1, coord.getNumberOfPendingCheckpoints());
+               assertEquals(1, 
coord.getNumberOfRetainedSuccessfulCheckpoints());
                assertTrue(savepointFuture1.isDone());
+
+               coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, 
attemptID1, checkpointMetaData3));
+               coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, 
attemptID2, checkpointMetaData3));
+
+               assertEquals(0, coord.getNumberOfPendingCheckpoints());
+               assertEquals(2, 
coord.getNumberOfRetainedSuccessfulCheckpoints());
        }
 
        private void testMaxConcurrentAttempts(int maxConcurrentAttempts) {
@@ -3035,4 +3044,101 @@ public class CheckpointCoordinatorTest {
                verify(tracker, times(1))
                        
.reportRestoredCheckpoint(any(RestoredCheckpointStats.class));
        }
+
+       /**
+        * FLINK-6328
+        *
+        * Tests that savepoints are not added to the {@link 
CompletedCheckpointStore} and,
+        * thus, are not subject to job recovery. The reason that we don't want 
that (until
+        * FLINK-4815 has been finished) is that the lifecycle of savepoints is 
not controlled
+        * by the {@link CheckpointCoordinator}.
+        */
+       @Test
+       public void testSavepointsAreNotAddedToCompletedCheckpointStore() 
throws Exception {
+               final JobID jobId = new JobID();
+               final ExecutionAttemptID executionAttemptId = new 
ExecutionAttemptID();
+               final ExecutionVertex vertex1 = 
mockExecutionVertex(executionAttemptId);
+               final CompletedCheckpointStore completedCheckpointStore = new 
StandaloneCompletedCheckpointStore(1);
+               final long checkpointTimestamp1 = 1L;
+               final long savepointTimestamp = 2L;
+               final long checkpointTimestamp2 = 3L;
+               final String savepointDir = 
tmpFolder.newFolder().getAbsolutePath();
+
+               final StandaloneCheckpointIDCounter checkpointIDCounter = new 
StandaloneCheckpointIDCounter();
+
+               CheckpointCoordinator checkpointCoordinator = new 
CheckpointCoordinator(
+                       jobId,
+                       600000L,
+                       600000L,
+                       0L,
+                       Integer.MAX_VALUE,
+                       ExternalizedCheckpointSettings.none(),
+                       new ExecutionVertex[]{vertex1},
+                       new ExecutionVertex[]{vertex1},
+                       new ExecutionVertex[]{vertex1},
+                       checkpointIDCounter,
+                       completedCheckpointStore,
+                       null,
+                       Executors.directExecutor());
+
+               // trigger a first checkpoint
+               assertTrue(
+                       "Triggering of a checkpoint should work.",
+                       
checkpointCoordinator.triggerCheckpoint(checkpointTimestamp1, false));
+
+               assertTrue(0 == 
completedCheckpointStore.getNumberOfRetainedCheckpoints());
+
+               CheckpointMetaData checkpointMetaData1 = new CheckpointMetaData(
+                       checkpointIDCounter.getLast(),
+                       0L);
+
+               // complete the 1st checkpoint
+               checkpointCoordinator.receiveAcknowledgeMessage(
+                       new AcknowledgeCheckpoint(
+                               jobId,
+                               executionAttemptId,
+                               checkpointMetaData1));
+
+               // check that the checkpoint has been completed
+               assertTrue(1 == 
completedCheckpointStore.getNumberOfRetainedCheckpoints());
+
+               // trigger a savepoint --> this should not have any effect on 
the CompletedCheckpointStore
+               Future<CompletedCheckpoint> savepointFuture = 
checkpointCoordinator.triggerSavepoint(savepointTimestamp, savepointDir);
+
+               CheckpointMetaData savepointMetaData1 = new CheckpointMetaData(
+                       checkpointIDCounter.getLast(),
+                       0L);
+
+               checkpointCoordinator.receiveAcknowledgeMessage(
+                       new AcknowledgeCheckpoint(
+                               jobId,
+                               executionAttemptId,
+                               savepointMetaData1));
+
+               // check that no errors occurred
+               final CompletedCheckpoint savepoint = savepointFuture.get();
+
+               assertFalse(
+                       "The savepoint should not have been added to the 
completed checkpoint store",
+                       savepoint.getCheckpointID() == 
completedCheckpointStore.getLatestCheckpoint().getCheckpointID());
+
+               assertTrue(
+                       "Triggering of a checkpoint should work.",
+                       
checkpointCoordinator.triggerCheckpoint(checkpointTimestamp2, false));
+
+               CheckpointMetaData checkpointMetaData2 = new CheckpointMetaData(
+                       checkpointIDCounter.getLast(),
+                       0L);
+
+               // complete the 2nd checkpoint
+               checkpointCoordinator.receiveAcknowledgeMessage(
+                       new AcknowledgeCheckpoint(
+                               jobId,
+                               executionAttemptId,
+                               checkpointMetaData2));
+
+               assertTrue(
+                       "The latest completed (proper) checkpoint should have 
been added to the completed checkpoint store.",
+                       
completedCheckpointStore.getLatestCheckpoint().getCheckpointID() == 
checkpointIDCounter.getLast());
+       }
 }

Reply via email to