Repository: flink
Updated Branches:
  refs/heads/master 64bbf1d86 -> 5231c9300


[FLINK-7844] [ckPt] Fail unacknowledged pending checkpoints for fine grained 
recovery

This commit will fail all pending checkpoints which have not been acknowledged 
by
the failed task in case of fine grained recovery. This is done in order to avoid
long checkpoint timeouts which might block the CheckpointCoordinator from 
triggering
new checkpoints

This closes #4844.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5231c930
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5231c930
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5231c930

Branch: refs/heads/master
Commit: 5231c9300c26895118b3277bc833536e92dcc6d1
Parents: 64bbf1d
Author: Till <[email protected]>
Authored: Tue Oct 17 10:57:37 2017 +0200
Committer: Till Rohrmann <[email protected]>
Committed: Fri Oct 27 19:28:20 2017 +0200

----------------------------------------------------------------------
 .../checkpoint/CheckpointCoordinator.java       |  90 ++++++++----
 .../runtime/checkpoint/PendingCheckpoint.java   |   4 +
 .../flink/runtime/executiongraph/Execution.java |   2 +-
 .../runtime/executiongraph/ExecutionGraph.java  |   5 +
 .../IndividualRestartsConcurrencyTest.java      | 147 ++++++++++++++++++-
 5 files changed, 220 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/5231c930/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index 62d92ad..9a4456e 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -724,31 +724,11 @@ public class CheckpointCoordinator {
                                return;
                        }
 
-                       checkpoint = pendingCheckpoints.get(checkpointId);
+                       checkpoint = pendingCheckpoints.remove(checkpointId);
 
                        if (checkpoint != null && !checkpoint.isDiscarded()) {
-                               LOG.info("Discarding checkpoint {} because of 
checkpoint decline from task {} : {}",
-                                               checkpointId, 
message.getTaskExecutionId(), reason);
-
-                               pendingCheckpoints.remove(checkpointId);
-                               checkpoint.abortDeclined();
-                               rememberRecentCheckpointId(checkpointId);
-
-                               // we don't have to schedule another 
"dissolving" checkpoint any more because the
-                               // cancellation barriers take care of breaking 
downstream alignments
-                               // we only need to make sure that suspended 
queued requests are resumed
-
-                               boolean haveMoreRecentPending = false;
-                               for (PendingCheckpoint p : 
pendingCheckpoints.values()) {
-                                       if (!p.isDiscarded() && 
p.getCheckpointId() >= checkpoint.getCheckpointId()) {
-                                               haveMoreRecentPending = true;
-                                               break;
-                                       }
-                               }
-
-                               if (!haveMoreRecentPending) {
-                                       triggerQueuedRequests();
-                               }
+                               LOG.info("Decline checkpoint {} by task {}.", 
checkpointId, message.getTaskExecutionId());
+                               discardCheckpoint(checkpoint, 
message.getReason());
                        }
                        else if (checkpoint != null) {
                                // this should not happen
@@ -893,10 +873,10 @@ public class CheckpointCoordinator {
                                if (!pendingCheckpoint.isDiscarded()) {
                                        pendingCheckpoint.abortError(e1);
                                }
-       
+
                                throw new CheckpointException("Could not 
finalize the pending checkpoint " + checkpointId + '.', e1);
                        }
-       
+
                        // the pending checkpoint must be discarded after the 
finalization
                        
Preconditions.checkState(pendingCheckpoint.isDiscarded() && completedCheckpoint 
!= null);
 
@@ -928,7 +908,7 @@ public class CheckpointCoordinator {
 
                        triggerQueuedRequests();
                }
-               
+
                rememberRecentCheckpointId(checkpointId);
 
                // record the time when this was completed, to calculate
@@ -962,6 +942,28 @@ public class CheckpointCoordinator {
                }
        }
 
+       /**
+        * Fails all pending checkpoints which have not been acknowledged by 
the given execution
+        * attempt id.
+        *
+        * @param executionAttemptId for which to discard unaknowledged pending 
checkpoints
+        * @param cause of the failure
+        */
+       public void failUnacknowledgedPendingCheckpointsFor(ExecutionAttemptID 
executionAttemptId, Throwable cause) {
+               synchronized (lock) {
+                       Iterator<PendingCheckpoint> pendingCheckpointIterator = 
pendingCheckpoints.values().iterator();
+
+                       while (pendingCheckpointIterator.hasNext()) {
+                               final PendingCheckpoint pendingCheckpoint = 
pendingCheckpointIterator.next();
+
+                               if 
(!pendingCheckpoint.isAcknowledgedBy(executionAttemptId)) {
+                                       pendingCheckpointIterator.remove();
+                                       discardCheckpoint(pendingCheckpoint, 
cause);
+                               }
+                       }
+               }
+       }
+
        private void rememberRecentCheckpointId(long id) {
                if (recentPendingCheckpoints.size() >= 
NUM_GHOST_CHECKPOINT_IDS) {
                        recentPendingCheckpoints.removeFirst();
@@ -1270,6 +1272,42 @@ public class CheckpointCoordinator {
        }
 
        /**
+        * Discards the given pending checkpoint because of the given cause.
+        *
+        * @param pendingCheckpoint to discard
+        * @param cause for discarding the checkpoint
+        */
+       private void discardCheckpoint(PendingCheckpoint pendingCheckpoint, 
@Nullable Throwable cause) {
+               assert(Thread.holdsLock(lock));
+               Preconditions.checkNotNull(pendingCheckpoint);
+
+               final long checkpointId = pendingCheckpoint.getCheckpointId();
+
+               final String reason = (cause != null) ? cause.getMessage() : "";
+
+               LOG.info("Discarding checkpoint {} because: {}", checkpointId, 
reason);
+
+               pendingCheckpoint.abortDeclined();
+               rememberRecentCheckpointId(checkpointId);
+
+               // we don't have to schedule another "dissolving" checkpoint 
any more because the
+               // cancellation barriers take care of breaking downstream 
alignments
+               // we only need to make sure that suspended queued requests are 
resumed
+
+               boolean haveMoreRecentPending = false;
+               for (PendingCheckpoint p : pendingCheckpoints.values()) {
+                       if (!p.isDiscarded() && p.getCheckpointId() >= 
pendingCheckpoint.getCheckpointId()) {
+                               haveMoreRecentPending = true;
+                               break;
+                       }
+               }
+
+               if (!haveMoreRecentPending) {
+                       triggerQueuedRequests();
+               }
+       }
+
+       /**
         * Discards the given state object asynchronously belonging to the 
given job, execution attempt
         * id and checkpoint id.
         *

http://git-wip-us.apache.org/repos/asf/flink/blob/5231c930/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
index 16231dd..a9b6d4d 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
@@ -184,6 +184,10 @@ public class PendingCheckpoint {
                return this.notYetAcknowledgedTasks.isEmpty() && !discarded;
        }
 
+       public boolean isAcknowledgedBy(ExecutionAttemptID executionAttemptId) {
+               return !notYetAcknowledgedTasks.containsKey(executionAttemptId);
+       }
+
        public boolean isDiscarded() {
                return discarded;
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/5231c930/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
index 2074820..939c290 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
@@ -941,7 +941,7 @@ public class Execution implements AccessExecution, 
Archiveable<ArchivedExecution
                                                }
                                        } catch (Throwable tt) {
                                                // no reason this should ever 
happen, but log it to be safe
-                                               LOG.error("Error triggering 
cancel call while marking task as failed.", tt);
+                                               LOG.error("Error triggering 
cancel call while marking task {} as failed.", 
getVertex().getTaskNameWithSubtaskIndex(), tt);
                                        }
                                }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/5231c930/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index f002c5c..dca6c44 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -1710,6 +1710,11 @@ public class ExecutionGraph implements 
AccessExecutionGraph, Archiveable<Archive
                        if (execution.getGlobalModVersion() == 
globalModVersion) {
                                try {
                                        
failoverStrategy.onTaskFailure(execution, ex);
+
+                                       // fail all checkpoints which the 
failed task has not yet acknowledged
+                                       if (checkpointCoordinator != null) {
+                                               
checkpointCoordinator.failUnacknowledgedPendingCheckpointsFor(execution.getAttemptId(),
 ex);
+                                       }
                                }
                                catch (Throwable t) {
                                        // bug in the failover strategy - fall 
back to global failover

http://git-wip-us.apache.org/repos/asf/flink/blob/5231c930/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/IndividualRestartsConcurrencyTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/IndividualRestartsConcurrencyTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/IndividualRestartsConcurrencyTest.java
index 33456f7..cb94d25 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/IndividualRestartsConcurrencyTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/IndividualRestartsConcurrencyTest.java
@@ -21,6 +21,13 @@ package org.apache.flink.runtime.executiongraph;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.core.testutils.ManuallyTriggeredDirectExecutor;
+import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.checkpoint.CheckpointStatsTracker;
+import org.apache.flink.runtime.checkpoint.PendingCheckpoint;
+import org.apache.flink.runtime.checkpoint.StandaloneCheckpointIDCounter;
+import org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore;
+import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.failover.FailoverStrategy;
 import 
org.apache.flink.runtime.executiongraph.failover.FailoverStrategy.Factory;
@@ -33,17 +40,39 @@ import org.apache.flink.runtime.instance.SlotProvider;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.JobVertex;
+import 
org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
+import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
+import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
+import 
org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.runtime.testtasks.NoOpInvokable;
+import org.apache.flink.util.TestLogger;
 
 import org.junit.Test;
 
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
 
 import static 
org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.waitUntilExecutionState;
 import static 
org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.waitUntilJobStatus;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.timeout;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
 /**
  * These tests make sure that global failover (restart all) always takes 
precedence over
@@ -52,7 +81,7 @@ import static org.junit.Assert.assertTrue;
  * <p>This test must be in the package it resides in, because it uses 
package-private methods
  * from the ExecutionGraph classes.
  */
-public class IndividualRestartsConcurrencyTest {
+public class IndividualRestartsConcurrencyTest extends TestLogger {
 
        /**
         * Tests that a cancellation concurrent to a local failover leads to a 
properly
@@ -261,6 +290,122 @@ public class IndividualRestartsConcurrencyTest {
                assertEquals(0, slotProvider.getNumberOfAvailableSlots());
        }
 
+       /**
+        * Tests that a local failure fails all pending checkpoints which have 
not been acknowledged by the failing
+        * task.
+        */
+       @Test
+       public void testLocalFailureFailsPendingCheckpoints() throws Exception {
+               final JobID jid = new JobID();
+               final int parallelism = 2;
+               final long verifyTimeout = 5000L;
+
+               final TaskManagerGateway taskManagerGateway = 
mock(TaskManagerGateway.class);
+               
when(taskManagerGateway.submitTask(any(TaskDeploymentDescriptor.class), 
any(Time.class))).thenReturn(CompletableFuture.completedFuture(Acknowledge.get()));
+               
when(taskManagerGateway.cancelTask(any(ExecutionAttemptID.class), 
any(Time.class))).thenReturn(CompletableFuture.completedFuture(Acknowledge.get()));
+
+               final SimpleSlotProvider slotProvider = new 
SimpleSlotProvider(jid, parallelism, taskManagerGateway);
+               final Executor executor = TestingUtils.defaultExecutor();
+
+
+               final CheckpointCoordinatorConfiguration 
checkpointCoordinatorConfiguration = new CheckpointCoordinatorConfiguration(
+                       10L,
+                       100000L,
+                       1L,
+                       3,
+                       ExternalizedCheckpointSettings.none(),
+                       true);
+
+               final ExecutionGraph graph = createSampleGraph(
+                       jid,
+                       new IndividualFailoverWithCustomExecutor(executor),
+                       slotProvider,
+                       parallelism);
+
+               final List<ExecutionJobVertex> allVertices = new 
ArrayList<>(graph.getAllVertices().values());
+
+               final StandaloneCheckpointIDCounter 
standaloneCheckpointIDCounter = new StandaloneCheckpointIDCounter();
+
+               graph.enableCheckpointing(
+                       
checkpointCoordinatorConfiguration.getCheckpointInterval(),
+                       
checkpointCoordinatorConfiguration.getCheckpointTimeout(),
+                       
checkpointCoordinatorConfiguration.getMinPauseBetweenCheckpoints(),
+                       
checkpointCoordinatorConfiguration.getMaxConcurrentCheckpoints(),
+                       
checkpointCoordinatorConfiguration.getExternalizedCheckpointSettings(),
+                       allVertices,
+                       allVertices,
+                       allVertices,
+                       Collections.emptyList(),
+                       standaloneCheckpointIDCounter,
+                       new StandaloneCompletedCheckpointStore(1),
+                       "",
+                       new MemoryStateBackend(),
+                       new CheckpointStatsTracker(
+                               1,
+                               allVertices,
+                               checkpointCoordinatorConfiguration,
+                               new UnregisteredTaskMetricsGroup()));
+
+               final CheckpointCoordinator checkpointCoordinator = 
graph.getCheckpointCoordinator();
+
+               final ExecutionJobVertex ejv = 
graph.getVerticesTopologically().iterator().next();
+               final ExecutionVertex vertex1 = ejv.getTaskVertices()[0];
+               final ExecutionVertex vertex2 = ejv.getTaskVertices()[1];
+
+               graph.scheduleForExecution();
+               assertEquals(JobStatus.RUNNING, graph.getState());
+
+               verify(taskManagerGateway, 
timeout(verifyTimeout).times(parallelism)).submitTask(any(TaskDeploymentDescriptor.class),
 any(Time.class));
+
+               // switch all executions to running
+               for (ExecutionVertex executionVertex : 
graph.getAllExecutionVertices()) {
+                       
executionVertex.getCurrentExecutionAttempt().switchToRunning();
+               }
+
+               // wait for a first checkpoint to be triggered
+               verify(taskManagerGateway, 
timeout(verifyTimeout).times(3)).triggerCheckpoint(
+                       eq(vertex1.getCurrentExecutionAttempt().getAttemptId()),
+                       any(JobID.class),
+                       anyLong(),
+                       anyLong(),
+                       any(CheckpointOptions.class));
+
+               verify(taskManagerGateway, 
timeout(verifyTimeout).times(3)).triggerCheckpoint(
+                       eq(vertex2.getCurrentExecutionAttempt().getAttemptId()),
+                       any(JobID.class),
+                       anyLong(),
+                       anyLong(),
+                       any(CheckpointOptions.class));
+
+               assertEquals(3, 
checkpointCoordinator.getNumberOfPendingCheckpoints());
+
+               long checkpointToAcknowledge = 
standaloneCheckpointIDCounter.getLast();
+
+               checkpointCoordinator.receiveAcknowledgeMessage(
+                       new AcknowledgeCheckpoint(
+                               graph.getJobID(),
+                               
vertex1.getCurrentExecutionAttempt().getAttemptId(),
+                               checkpointToAcknowledge));
+
+               Map<Long, PendingCheckpoint> oldPendingCheckpoints = new 
HashMap<>(3);
+
+               for (PendingCheckpoint pendingCheckpoint : 
checkpointCoordinator.getPendingCheckpoints().values()) {
+                       assertFalse(pendingCheckpoint.isDiscarded());
+                       
oldPendingCheckpoints.put(pendingCheckpoint.getCheckpointId(), 
pendingCheckpoint);
+               }
+
+               // let one of the vertices fail - this should trigger the 
failing of not acknowledged pending checkpoints
+               vertex1.getCurrentExecutionAttempt().fail(new Exception("test 
failure"));
+
+               for (PendingCheckpoint pendingCheckpoint : 
oldPendingCheckpoints.values()) {
+                       if (pendingCheckpoint.getCheckpointId() == 
checkpointToAcknowledge) {
+                               assertFalse(pendingCheckpoint.isDiscarded());
+                       } else {
+                               assertTrue(pendingCheckpoint.isDiscarded());
+                       }
+               }
+       }
+
        // 
------------------------------------------------------------------------
        //  utilities
        // 
------------------------------------------------------------------------

Reply via email to