Repository: flink Updated Branches: refs/heads/master 64bbf1d86 -> 5231c9300
[FLINK-7844] [ckPt] Fail unacknowledged pending checkpoints for fine grained recovery This commit will fail all pending checkpoints which have not been acknowledged by the failed task in case of fine grained recovery. This is done in order to avoid long checkpoint timeouts which might block the CheckpointCoordinator from triggering new checkpoints This closes #4844. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5231c930 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5231c930 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5231c930 Branch: refs/heads/master Commit: 5231c9300c26895118b3277bc833536e92dcc6d1 Parents: 64bbf1d Author: Till <[email protected]> Authored: Tue Oct 17 10:57:37 2017 +0200 Committer: Till Rohrmann <[email protected]> Committed: Fri Oct 27 19:28:20 2017 +0200 ---------------------------------------------------------------------- .../checkpoint/CheckpointCoordinator.java | 90 ++++++++---- .../runtime/checkpoint/PendingCheckpoint.java | 4 + .../flink/runtime/executiongraph/Execution.java | 2 +- .../runtime/executiongraph/ExecutionGraph.java | 5 + .../IndividualRestartsConcurrencyTest.java | 147 ++++++++++++++++++- 5 files changed, 220 insertions(+), 28 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/5231c930/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java index 62d92ad..9a4456e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java @@ -724,31 +724,11 @@ public class CheckpointCoordinator { return; } - checkpoint = pendingCheckpoints.get(checkpointId); + checkpoint = pendingCheckpoints.remove(checkpointId); if (checkpoint != null && !checkpoint.isDiscarded()) { - LOG.info("Discarding checkpoint {} because of checkpoint decline from task {} : {}", - checkpointId, message.getTaskExecutionId(), reason); - - pendingCheckpoints.remove(checkpointId); - checkpoint.abortDeclined(); - rememberRecentCheckpointId(checkpointId); - - // we don't have to schedule another "dissolving" checkpoint any more because the - // cancellation barriers take care of breaking downstream alignments - // we only need to make sure that suspended queued requests are resumed - - boolean haveMoreRecentPending = false; - for (PendingCheckpoint p : pendingCheckpoints.values()) { - if (!p.isDiscarded() && p.getCheckpointId() >= checkpoint.getCheckpointId()) { - haveMoreRecentPending = true; - break; - } - } - - if (!haveMoreRecentPending) { - triggerQueuedRequests(); - } + LOG.info("Decline checkpoint {} by task {}.", checkpointId, message.getTaskExecutionId()); + discardCheckpoint(checkpoint, message.getReason()); } else if (checkpoint != null) { // this should not happen @@ -893,10 +873,10 @@ public class CheckpointCoordinator { if (!pendingCheckpoint.isDiscarded()) { pendingCheckpoint.abortError(e1); } - + throw new CheckpointException("Could not finalize the pending checkpoint " + checkpointId + '.', e1); } - + // the pending checkpoint must be discarded after the finalization Preconditions.checkState(pendingCheckpoint.isDiscarded() && completedCheckpoint != null); @@ -928,7 +908,7 @@ public class CheckpointCoordinator { triggerQueuedRequests(); } - + rememberRecentCheckpointId(checkpointId); // record the time when this was completed, to calculate @@ -962,6 +942,28 @@ public class CheckpointCoordinator { } } + /** + * Fails all pending checkpoints which have not been acknowledged by the given execution + * attempt id. + * + * @param executionAttemptId for which to discard unaknowledged pending checkpoints + * @param cause of the failure + */ + public void failUnacknowledgedPendingCheckpointsFor(ExecutionAttemptID executionAttemptId, Throwable cause) { + synchronized (lock) { + Iterator<PendingCheckpoint> pendingCheckpointIterator = pendingCheckpoints.values().iterator(); + + while (pendingCheckpointIterator.hasNext()) { + final PendingCheckpoint pendingCheckpoint = pendingCheckpointIterator.next(); + + if (!pendingCheckpoint.isAcknowledgedBy(executionAttemptId)) { + pendingCheckpointIterator.remove(); + discardCheckpoint(pendingCheckpoint, cause); + } + } + } + } + private void rememberRecentCheckpointId(long id) { if (recentPendingCheckpoints.size() >= NUM_GHOST_CHECKPOINT_IDS) { recentPendingCheckpoints.removeFirst(); @@ -1270,6 +1272,42 @@ public class CheckpointCoordinator { } /** + * Discards the given pending checkpoint because of the given cause. + * + * @param pendingCheckpoint to discard + * @param cause for discarding the checkpoint + */ + private void discardCheckpoint(PendingCheckpoint pendingCheckpoint, @Nullable Throwable cause) { + assert(Thread.holdsLock(lock)); + Preconditions.checkNotNull(pendingCheckpoint); + + final long checkpointId = pendingCheckpoint.getCheckpointId(); + + final String reason = (cause != null) ? cause.getMessage() : ""; + + LOG.info("Discarding checkpoint {} because: {}", checkpointId, reason); + + pendingCheckpoint.abortDeclined(); + rememberRecentCheckpointId(checkpointId); + + // we don't have to schedule another "dissolving" checkpoint any more because the + // cancellation barriers take care of breaking downstream alignments + // we only need to make sure that suspended queued requests are resumed + + boolean haveMoreRecentPending = false; + for (PendingCheckpoint p : pendingCheckpoints.values()) { + if (!p.isDiscarded() && p.getCheckpointId() >= pendingCheckpoint.getCheckpointId()) { + haveMoreRecentPending = true; + break; + } + } + + if (!haveMoreRecentPending) { + triggerQueuedRequests(); + } + } + + /** * Discards the given state object asynchronously belonging to the given job, execution attempt * id and checkpoint id. * http://git-wip-us.apache.org/repos/asf/flink/blob/5231c930/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java index 16231dd..a9b6d4d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java @@ -184,6 +184,10 @@ public class PendingCheckpoint { return this.notYetAcknowledgedTasks.isEmpty() && !discarded; } + public boolean isAcknowledgedBy(ExecutionAttemptID executionAttemptId) { + return !notYetAcknowledgedTasks.containsKey(executionAttemptId); + } + public boolean isDiscarded() { return discarded; } http://git-wip-us.apache.org/repos/asf/flink/blob/5231c930/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java index 2074820..939c290 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java @@ -941,7 +941,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution } } catch (Throwable tt) { // no reason this should ever happen, but log it to be safe - LOG.error("Error triggering cancel call while marking task as failed.", tt); + LOG.error("Error triggering cancel call while marking task {} as failed.", getVertex().getTaskNameWithSubtaskIndex(), tt); } } http://git-wip-us.apache.org/repos/asf/flink/blob/5231c930/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java index f002c5c..dca6c44 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java @@ -1710,6 +1710,11 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive if (execution.getGlobalModVersion() == globalModVersion) { try { failoverStrategy.onTaskFailure(execution, ex); + + // fail all checkpoints which the failed task has not yet acknowledged + if (checkpointCoordinator != null) { + checkpointCoordinator.failUnacknowledgedPendingCheckpointsFor(execution.getAttemptId(), ex); + } } catch (Throwable t) { // bug in the failover strategy - fall back to global failover http://git-wip-us.apache.org/repos/asf/flink/blob/5231c930/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/IndividualRestartsConcurrencyTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/IndividualRestartsConcurrencyTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/IndividualRestartsConcurrencyTest.java index 33456f7..cb94d25 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/IndividualRestartsConcurrencyTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/IndividualRestartsConcurrencyTest.java @@ -21,6 +21,13 @@ package org.apache.flink.runtime.executiongraph; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.time.Time; import org.apache.flink.core.testutils.ManuallyTriggeredDirectExecutor; +import org.apache.flink.runtime.checkpoint.CheckpointCoordinator; +import org.apache.flink.runtime.checkpoint.CheckpointOptions; +import org.apache.flink.runtime.checkpoint.CheckpointStatsTracker; +import org.apache.flink.runtime.checkpoint.PendingCheckpoint; +import org.apache.flink.runtime.checkpoint.StandaloneCheckpointIDCounter; +import org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore; +import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor; import org.apache.flink.runtime.execution.ExecutionState; import org.apache.flink.runtime.executiongraph.failover.FailoverStrategy; import org.apache.flink.runtime.executiongraph.failover.FailoverStrategy.Factory; @@ -33,17 +40,39 @@ import org.apache.flink.runtime.instance.SlotProvider; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.JobStatus; import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration; +import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings; +import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway; +import org.apache.flink.runtime.messages.Acknowledge; +import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint; +import org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup; +import org.apache.flink.runtime.state.memory.MemoryStateBackend; import org.apache.flink.runtime.testingUtils.TestingUtils; import org.apache.flink.runtime.testtasks.NoOpInvokable; +import org.apache.flink.util.TestLogger; import org.junit.Test; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executor; import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.waitUntilExecutionState; import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.waitUntilJobStatus; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyLong; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.timeout; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; /** * These tests make sure that global failover (restart all) always takes precedence over @@ -52,7 +81,7 @@ import static org.junit.Assert.assertTrue; * <p>This test must be in the package it resides in, because it uses package-private methods * from the ExecutionGraph classes. */ -public class IndividualRestartsConcurrencyTest { +public class IndividualRestartsConcurrencyTest extends TestLogger { /** * Tests that a cancellation concurrent to a local failover leads to a properly @@ -261,6 +290,122 @@ public class IndividualRestartsConcurrencyTest { assertEquals(0, slotProvider.getNumberOfAvailableSlots()); } + /** + * Tests that a local failure fails all pending checkpoints which have not been acknowledged by the failing + * task. + */ + @Test + public void testLocalFailureFailsPendingCheckpoints() throws Exception { + final JobID jid = new JobID(); + final int parallelism = 2; + final long verifyTimeout = 5000L; + + final TaskManagerGateway taskManagerGateway = mock(TaskManagerGateway.class); + when(taskManagerGateway.submitTask(any(TaskDeploymentDescriptor.class), any(Time.class))).thenReturn(CompletableFuture.completedFuture(Acknowledge.get())); + when(taskManagerGateway.cancelTask(any(ExecutionAttemptID.class), any(Time.class))).thenReturn(CompletableFuture.completedFuture(Acknowledge.get())); + + final SimpleSlotProvider slotProvider = new SimpleSlotProvider(jid, parallelism, taskManagerGateway); + final Executor executor = TestingUtils.defaultExecutor(); + + + final CheckpointCoordinatorConfiguration checkpointCoordinatorConfiguration = new CheckpointCoordinatorConfiguration( + 10L, + 100000L, + 1L, + 3, + ExternalizedCheckpointSettings.none(), + true); + + final ExecutionGraph graph = createSampleGraph( + jid, + new IndividualFailoverWithCustomExecutor(executor), + slotProvider, + parallelism); + + final List<ExecutionJobVertex> allVertices = new ArrayList<>(graph.getAllVertices().values()); + + final StandaloneCheckpointIDCounter standaloneCheckpointIDCounter = new StandaloneCheckpointIDCounter(); + + graph.enableCheckpointing( + checkpointCoordinatorConfiguration.getCheckpointInterval(), + checkpointCoordinatorConfiguration.getCheckpointTimeout(), + checkpointCoordinatorConfiguration.getMinPauseBetweenCheckpoints(), + checkpointCoordinatorConfiguration.getMaxConcurrentCheckpoints(), + checkpointCoordinatorConfiguration.getExternalizedCheckpointSettings(), + allVertices, + allVertices, + allVertices, + Collections.emptyList(), + standaloneCheckpointIDCounter, + new StandaloneCompletedCheckpointStore(1), + "", + new MemoryStateBackend(), + new CheckpointStatsTracker( + 1, + allVertices, + checkpointCoordinatorConfiguration, + new UnregisteredTaskMetricsGroup())); + + final CheckpointCoordinator checkpointCoordinator = graph.getCheckpointCoordinator(); + + final ExecutionJobVertex ejv = graph.getVerticesTopologically().iterator().next(); + final ExecutionVertex vertex1 = ejv.getTaskVertices()[0]; + final ExecutionVertex vertex2 = ejv.getTaskVertices()[1]; + + graph.scheduleForExecution(); + assertEquals(JobStatus.RUNNING, graph.getState()); + + verify(taskManagerGateway, timeout(verifyTimeout).times(parallelism)).submitTask(any(TaskDeploymentDescriptor.class), any(Time.class)); + + // switch all executions to running + for (ExecutionVertex executionVertex : graph.getAllExecutionVertices()) { + executionVertex.getCurrentExecutionAttempt().switchToRunning(); + } + + // wait for a first checkpoint to be triggered + verify(taskManagerGateway, timeout(verifyTimeout).times(3)).triggerCheckpoint( + eq(vertex1.getCurrentExecutionAttempt().getAttemptId()), + any(JobID.class), + anyLong(), + anyLong(), + any(CheckpointOptions.class)); + + verify(taskManagerGateway, timeout(verifyTimeout).times(3)).triggerCheckpoint( + eq(vertex2.getCurrentExecutionAttempt().getAttemptId()), + any(JobID.class), + anyLong(), + anyLong(), + any(CheckpointOptions.class)); + + assertEquals(3, checkpointCoordinator.getNumberOfPendingCheckpoints()); + + long checkpointToAcknowledge = standaloneCheckpointIDCounter.getLast(); + + checkpointCoordinator.receiveAcknowledgeMessage( + new AcknowledgeCheckpoint( + graph.getJobID(), + vertex1.getCurrentExecutionAttempt().getAttemptId(), + checkpointToAcknowledge)); + + Map<Long, PendingCheckpoint> oldPendingCheckpoints = new HashMap<>(3); + + for (PendingCheckpoint pendingCheckpoint : checkpointCoordinator.getPendingCheckpoints().values()) { + assertFalse(pendingCheckpoint.isDiscarded()); + oldPendingCheckpoints.put(pendingCheckpoint.getCheckpointId(), pendingCheckpoint); + } + + // let one of the vertices fail - this should trigger the failing of not acknowledged pending checkpoints + vertex1.getCurrentExecutionAttempt().fail(new Exception("test failure")); + + for (PendingCheckpoint pendingCheckpoint : oldPendingCheckpoints.values()) { + if (pendingCheckpoint.getCheckpointId() == checkpointToAcknowledge) { + assertFalse(pendingCheckpoint.isDiscarded()); + } else { + assertTrue(pendingCheckpoint.isDiscarded()); + } + } + } + // ------------------------------------------------------------------------ // utilities // ------------------------------------------------------------------------
