dawidwys closed pull request #6704: [FLINK-10354] Revert "[FLINK-6328] [chkPts] 
Don't add savepoints to CompletedCheckpointStore"
URL: https://github.com/apache/flink/pull/6704
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/docs/ops/state/savepoints.md b/docs/ops/state/savepoints.md
index 6dd5154c5e6..d31063ee2c5 100644
--- a/docs/ops/state/savepoints.md
+++ b/docs/ops/state/savepoints.md
@@ -106,6 +106,11 @@ Please follow <a 
href="https://issues.apache.org/jira/browse/FLINK-5778";>FLINK-5
 
 Note that if you use the `MemoryStateBackend`, metadata *and* savepoint state 
will be stored in the `_metadata` file. Since it is self-contained, you may 
move the file and restore from any location.
 
+<div class="alert alert-warning">
+  <strong>Attention:</strong> It is discouraged to move or delete last 
savepoint of a running job, cause this might interfere with failure-recovery. 
Savepoints have side-effects on exactly-once sinks, therefore 
+  to ensure exactly-once semantics, if there is no checkpoint after the last 
savepoint, the savepoint will be used for recovery. 
+</div>
+
 #### Trigger a Savepoint
 
 {% highlight shell %}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index e936b246222..57337b6286f 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -839,28 +839,22 @@ private void completePendingCheckpoint(PendingCheckpoint 
pendingCheckpoint) thro
                        // the pending checkpoint must be discarded after the 
finalization
                        
Preconditions.checkState(pendingCheckpoint.isDiscarded() && completedCheckpoint 
!= null);
 
-                       // TODO: add savepoints to completed checkpoint store 
once FLINK-4815 has been completed
-                       if (!completedCheckpoint.getProperties().isSavepoint()) 
{
-                               try {
-                                       
completedCheckpointStore.addCheckpoint(completedCheckpoint);
-                               } catch (Exception exception) {
-                                       // we failed to store the completed 
checkpoint. Let's clean up
-                                       executor.execute(new Runnable() {
-                                               @Override
-                                               public void run() {
-                                                       try {
-                                                               
completedCheckpoint.discardOnFailedStoring();
-                                                       } catch (Throwable t) {
-                                                               LOG.warn("Could 
not properly discard completed checkpoint {} of job {}.", 
completedCheckpoint.getCheckpointID(), job, t);
-                                                       }
+                       try {
+                               
completedCheckpointStore.addCheckpoint(completedCheckpoint);
+                       } catch (Exception exception) {
+                               // we failed to store the completed checkpoint. 
Let's clean up
+                               executor.execute(new Runnable() {
+                                       @Override
+                                       public void run() {
+                                               try {
+                                                       
completedCheckpoint.discardOnFailedStoring();
+                                               } catch (Throwable t) {
+                                                       LOG.warn("Could not 
properly discard completed checkpoint {}.", 
completedCheckpoint.getCheckpointID(), t);
                                                }
-                                       });
-
-                                       throw new CheckpointException("Could 
not complete the pending checkpoint " + checkpointId + '.', exception);
-                               }
+                                       }
+                               });
 
-                               // drop those pending checkpoints that are at 
prior to the completed one
-                               dropSubsumedCheckpoints(checkpointId);
+                               throw new CheckpointException("Could not 
complete the pending checkpoint " + checkpointId + '.', exception);
                        }
                } finally {
                        pendingCheckpoints.remove(checkpointId);
@@ -870,6 +864,9 @@ public void run() {
 
                rememberRecentCheckpointId(checkpointId);
 
+               // drop those pending checkpoints that are at prior to the 
completed one
+               dropSubsumedCheckpoints(checkpointId);
+
                // record the time when this was completed, to calculate
                // the 'min delay between checkpoints'
                lastCheckpointCompletionNanos = System.nanoTime();
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
index b113e12ef69..3650f43066d 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
@@ -1494,8 +1494,8 @@ public void testTriggerAndConfirmSimpleSavepoint() throws 
Exception {
                assertTrue(pending.isDiscarded());
                assertTrue(savepointFuture.isDone());
 
-               // the now the savepoint should be completed but not added to 
the completed checkpoint store
-               assertEquals(0, 
coord.getNumberOfRetainedSuccessfulCheckpoints());
+               // the now we should have a completed checkpoint
+               assertEquals(1, 
coord.getNumberOfRetainedSuccessfulCheckpoints());
                assertEquals(0, coord.getNumberOfPendingCheckpoints());
 
                // validate that the relevant tasks got a confirmation message
@@ -1510,7 +1510,7 @@ public void testTriggerAndConfirmSimpleSavepoint() throws 
Exception {
                        verify(subtaskState2, 
times(1)).registerSharedStates(any(SharedStateRegistry.class));
                }
 
-               CompletedCheckpoint success = savepointFuture.get();
+               CompletedCheckpoint success = 
coord.getSuccessfulCheckpoints().get(0);
                assertEquals(jid, success.getJobId());
                assertEquals(timestamp, success.getTimestamp());
                assertEquals(pending.getCheckpointId(), 
success.getCheckpointID());
@@ -1528,9 +1528,9 @@ public void testTriggerAndConfirmSimpleSavepoint() throws 
Exception {
                coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, 
attemptID2, checkpointIdNew));
 
                assertEquals(0, coord.getNumberOfPendingCheckpoints());
-               assertEquals(0, 
coord.getNumberOfRetainedSuccessfulCheckpoints());
+               assertEquals(1, 
coord.getNumberOfRetainedSuccessfulCheckpoints());
 
-               CompletedCheckpoint successNew = savepointFuture.get();
+               CompletedCheckpoint successNew = 
coord.getSuccessfulCheckpoints().get(0);
                assertEquals(jid, successNew.getJobId());
                assertEquals(timestampNew, successNew.getTimestamp());
                assertEquals(checkpointIdNew, successNew.getCheckpointID());
@@ -1557,7 +1557,7 @@ public void testTriggerAndConfirmSimpleSavepoint() throws 
Exception {
         * Triggers a savepoint and two checkpoints. The second checkpoint 
completes
         * and subsumes the first checkpoint, but not the first savepoint. Then 
we
         * trigger another checkpoint and savepoint. The 2nd savepoint 
completes and
-        * does neither subsume the last checkpoint nor the first savepoint.
+        * subsumes the last checkpoint, but not the first savepoint.
         */
        @Test
        public void testSavepointsAreNotSubsumed() throws Exception {
@@ -1614,19 +1614,18 @@ public void testSavepointsAreNotSubsumed() throws 
Exception {
                assertFalse(savepointFuture1.isDone());
 
                assertTrue(coord.triggerCheckpoint(timestamp + 3, false));
-               long checkpointId3 = counter.getLast();
                assertEquals(2, coord.getNumberOfPendingCheckpoints());
 
                CompletableFuture<CompletedCheckpoint> savepointFuture2 = 
coord.triggerSavepoint(timestamp + 4, savepointDir);
                long savepointId2 = counter.getLast();
                assertEquals(3, coord.getNumberOfPendingCheckpoints());
 
-               // 2nd savepoint should not subsume the last checkpoint and the 
1st savepoint
+               // 2nd savepoint should subsume the last checkpoint, but not 
the 1st savepoint
                coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, 
attemptID1, savepointId2));
                coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, 
attemptID2, savepointId2));
 
-               assertEquals(2, coord.getNumberOfPendingCheckpoints());
-               assertEquals(1, 
coord.getNumberOfRetainedSuccessfulCheckpoints());
+               assertEquals(1, coord.getNumberOfPendingCheckpoints());
+               assertEquals(2, 
coord.getNumberOfRetainedSuccessfulCheckpoints());
                
assertFalse(coord.getPendingCheckpoints().get(savepointId1).isDiscarded());
 
                assertFalse(savepointFuture1.isDone());
@@ -1636,15 +1635,9 @@ public void testSavepointsAreNotSubsumed() throws 
Exception {
                coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, 
attemptID1, savepointId1));
                coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, 
attemptID2, savepointId1));
 
-               assertEquals(1, coord.getNumberOfPendingCheckpoints());
-               assertEquals(1, 
coord.getNumberOfRetainedSuccessfulCheckpoints());
-               assertTrue(savepointFuture1.isDone());
-
-               coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, 
attemptID1, checkpointId3));
-               coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, 
attemptID2, checkpointId3));
-
                assertEquals(0, coord.getNumberOfPendingCheckpoints());
-               assertEquals(2, 
coord.getNumberOfRetainedSuccessfulCheckpoints());
+               assertEquals(3, 
coord.getNumberOfRetainedSuccessfulCheckpoints());
+               assertTrue(savepointFuture1.isDone());
        }
 
        private void testMaxConcurrentAttempts(int maxConcurrentAttempts) {
@@ -3467,92 +3460,6 @@ public void testCheckpointStatsTrackerRestoreCallback() 
throws Exception {
                        
.reportRestoredCheckpoint(any(RestoredCheckpointStats.class));
        }
 
-       /**
-        * FLINK-6328
-        *
-        * Tests that savepoints are not added to the {@link 
CompletedCheckpointStore} and,
-        * thus, are not subject to job recovery. The reason that we don't want 
that (until
-        * FLINK-4815 has been finished) is that the lifecycle of savepoints is 
not controlled
-        * by the {@link CheckpointCoordinator}.
-        */
-       @Test
-       public void testSavepointsAreNotAddedToCompletedCheckpointStore() 
throws Exception {
-               final JobID jobId = new JobID();
-               final ExecutionAttemptID executionAttemptId = new 
ExecutionAttemptID();
-               final ExecutionVertex vertex1 = 
mockExecutionVertex(executionAttemptId);
-               final CompletedCheckpointStore completedCheckpointStore = new 
StandaloneCompletedCheckpointStore(1);
-               final long checkpointTimestamp1 = 1L;
-               final long savepointTimestamp = 2L;
-               final long checkpointTimestamp2 = 3L;
-               final String savepointDir = 
tmpFolder.newFolder().getAbsolutePath();
-
-               final StandaloneCheckpointIDCounter checkpointIDCounter = new 
StandaloneCheckpointIDCounter();
-
-               CheckpointCoordinator checkpointCoordinator = new 
CheckpointCoordinator(
-                       jobId,
-                       600000L,
-                       600000L,
-                       0L,
-                       Integer.MAX_VALUE,
-                       
CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
-                       new ExecutionVertex[]{vertex1},
-                       new ExecutionVertex[]{vertex1},
-                       new ExecutionVertex[]{vertex1},
-                       checkpointIDCounter,
-                       completedCheckpointStore,
-                       new MemoryStateBackend(),
-                       Executors.directExecutor(),
-                       SharedStateRegistry.DEFAULT_FACTORY);
-
-               // trigger a first checkpoint
-               assertTrue(
-                       "Triggering of a checkpoint should work.",
-                       
checkpointCoordinator.triggerCheckpoint(checkpointTimestamp1, false));
-
-               assertTrue(0 == 
completedCheckpointStore.getNumberOfRetainedCheckpoints());
-
-               // complete the 1st checkpoint
-               checkpointCoordinator.receiveAcknowledgeMessage(
-                       new AcknowledgeCheckpoint(
-                               jobId,
-                               executionAttemptId,
-                               checkpointIDCounter.getLast()));
-
-               // check that the checkpoint has been completed
-               assertTrue(1 == 
completedCheckpointStore.getNumberOfRetainedCheckpoints());
-
-               // trigger a savepoint --> this should not have any effect on 
the CompletedCheckpointStore
-               CompletableFuture<CompletedCheckpoint> savepointFuture = 
checkpointCoordinator.triggerSavepoint(savepointTimestamp, savepointDir);
-
-               checkpointCoordinator.receiveAcknowledgeMessage(
-                       new AcknowledgeCheckpoint(
-                               jobId,
-                               executionAttemptId,
-                               checkpointIDCounter.getLast()));
-
-               // check that no errors occurred
-               final CompletedCheckpoint savepoint = savepointFuture.get();
-
-               assertFalse(
-                       "The savepoint should not have been added to the 
completed checkpoint store",
-                       savepoint.getCheckpointID() == 
completedCheckpointStore.getLatestCheckpoint().getCheckpointID());
-
-               assertTrue(
-                       "Triggering of a checkpoint should work.",
-                       
checkpointCoordinator.triggerCheckpoint(checkpointTimestamp2, false));
-
-               // complete the 2nd checkpoint
-               checkpointCoordinator.receiveAcknowledgeMessage(
-                       new AcknowledgeCheckpoint(
-                               jobId,
-                               executionAttemptId,
-                               checkpointIDCounter.getLast()));
-
-               assertTrue(
-                       "The latest completed (proper) checkpoint should have 
been added to the completed checkpoint store.",
-                       
completedCheckpointStore.getLatestCheckpoint().getCheckpointID() == 
checkpointIDCounter.getLast());
-       }
-
        @Test
        public void testSharedStateRegistrationOnRestore() throws Exception {
 


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to