This is an automated email from the ASF dual-hosted git repository. chesnay pushed a commit to branch release-1.9 in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.9 by this push: new ca837bd [FLINK-13281][tests] Improve test for aborting pending checkpoints on failover ca837bd is described below commit ca837bd8d467975df9101f6ea5c19cb48cef5ac5 Author: Yun Tang <myas...@live.com> AuthorDate: Thu Jul 18 17:59:02 2019 +0800 [FLINK-13281][tests] Improve test for aborting pending checkpoints on failover --- ...egionStrategyNGAbortPendingCheckpointsTest.java | 47 +++++++++++++++------- 1 file changed, 32 insertions(+), 15 deletions(-) diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AdaptedRestartPipelinedRegionStrategyNGAbortPendingCheckpointsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AdaptedRestartPipelinedRegionStrategyNGAbortPendingCheckpointsTest.java index 899d490..acfb396 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AdaptedRestartPipelinedRegionStrategyNGAbortPendingCheckpointsTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AdaptedRestartPipelinedRegionStrategyNGAbortPendingCheckpointsTest.java @@ -30,13 +30,14 @@ import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAda import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutor; import org.apache.flink.runtime.execution.ExecutionState; import org.apache.flink.runtime.executiongraph.failover.AdaptedRestartPipelinedRegionStrategyNG; -import org.apache.flink.runtime.executiongraph.restart.InfiniteDelayRestartStrategy; +import org.apache.flink.runtime.executiongraph.restart.FixedDelayRestartStrategy; import org.apache.flink.runtime.executiongraph.utils.SimpleSlotProvider; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.JobVertex; import org.apache.flink.runtime.jobgraph.ScheduleMode; import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration; +import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint; import org.apache.flink.runtime.state.memory.MemoryStateBackend; import org.apache.flink.runtime.taskmanager.TaskExecutionState; import org.apache.flink.util.TestLogger; @@ -51,8 +52,8 @@ import java.util.List; import static org.apache.flink.util.Preconditions.checkState; import static org.hamcrest.Matchers.empty; -import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.is; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertThat; /** @@ -76,27 +77,41 @@ public class AdaptedRestartPipelinedRegionStrategyNGAbortPendingCheckpointsTest final ExecutionGraph executionGraph = createExecutionGraph(jobGraph); final Iterator<ExecutionVertex> vertexIterator = executionGraph.getAllExecutionVertices().iterator(); - final ExecutionVertex onlyExecutionVertex = vertexIterator.next(); + final ExecutionVertex firstExecutionVertex = vertexIterator.next(); - setTaskRunning(executionGraph, onlyExecutionVertex); + setTasksRunning(executionGraph, firstExecutionVertex, vertexIterator.next()); final CheckpointCoordinator checkpointCoordinator = executionGraph.getCheckpointCoordinator(); checkState(checkpointCoordinator != null); checkpointCoordinator.triggerCheckpoint(System.currentTimeMillis(), false); - final int pendingCheckpointsBeforeFailure = checkpointCoordinator.getNumberOfPendingCheckpoints(); + assertEquals(1, checkpointCoordinator.getNumberOfPendingCheckpoints()); + long checkpointId = checkpointCoordinator.getPendingCheckpoints().keySet().iterator().next(); - failVertex(onlyExecutionVertex); + AcknowledgeCheckpoint acknowledgeCheckpoint = new AcknowledgeCheckpoint( + jobGraph.getJobID(), + firstExecutionVertex.getCurrentExecutionAttempt().getAttemptId(), + checkpointId); + + // let the first vertex acknowledge the checkpoint, and fail it afterwards + // the failover strategy should then cancel all pending checkpoints on restart + checkpointCoordinator.receiveAcknowledgeMessage(acknowledgeCheckpoint, "Unknown location"); + assertEquals(1, checkpointCoordinator.getNumberOfPendingCheckpoints()); + + failVertex(firstExecutionVertex); + assertEquals(1, checkpointCoordinator.getNumberOfPendingCheckpoints()); + manualMainThreadExecutor.triggerScheduledTasks(); - assertThat(pendingCheckpointsBeforeFailure, is(equalTo(1))); assertNoPendingCheckpoints(checkpointCoordinator); } - private void setTaskRunning(final ExecutionGraph executionGraph, final ExecutionVertex executionVertex) { - executionGraph.updateState( - new TaskExecutionState(executionGraph.getJobID(), - executionVertex.getCurrentExecutionAttempt().getAttemptId(), - ExecutionState.RUNNING)); + private void setTasksRunning(final ExecutionGraph executionGraph, final ExecutionVertex... executionVertices) { + for (ExecutionVertex executionVertex : executionVertices) { + executionGraph.updateState( + new TaskExecutionState(executionGraph.getJobID(), + executionVertex.getCurrentExecutionAttempt().getAttemptId(), + ExecutionState.RUNNING)); + } } private void failVertex(final ExecutionVertex onlyExecutionVertex) { @@ -106,9 +121,11 @@ public class AdaptedRestartPipelinedRegionStrategyNGAbortPendingCheckpointsTest private static JobGraph createStreamingJobGraph() { final JobVertex v1 = new JobVertex("vertex1"); + final JobVertex v2 = new JobVertex("vertex2"); v1.setInvokableClass(AbstractInvokable.class); + v2.setInvokableClass(AbstractInvokable.class); - final JobGraph jobGraph = new JobGraph(v1); + final JobGraph jobGraph = new JobGraph(v1, v2); jobGraph.setScheduleMode(ScheduleMode.EAGER); return jobGraph; @@ -116,9 +133,9 @@ public class AdaptedRestartPipelinedRegionStrategyNGAbortPendingCheckpointsTest private ExecutionGraph createExecutionGraph(final JobGraph jobGraph) throws Exception { final ExecutionGraph executionGraph = new ExecutionGraphTestUtils.TestingExecutionGraphBuilder(jobGraph) - .setRestartStrategy(new InfiniteDelayRestartStrategy(10)) + .setRestartStrategy(new FixedDelayRestartStrategy(10, 0)) .setFailoverStrategyFactory(AdaptedRestartPipelinedRegionStrategyNG::new) - .setSlotProvider(new SimpleSlotProvider(jobGraph.getJobID(), 1)) + .setSlotProvider(new SimpleSlotProvider(jobGraph.getJobID(), 2)) .build(); enableCheckpointing(executionGraph);