This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.9 by this push:
     new ca837bd  [FLINK-13281][tests] Improve test for aborting pending 
checkpoints on failover
ca837bd is described below

commit ca837bd8d467975df9101f6ea5c19cb48cef5ac5
Author: Yun Tang <myas...@live.com>
AuthorDate: Thu Jul 18 17:59:02 2019 +0800

    [FLINK-13281][tests] Improve test for aborting pending checkpoints on 
failover
---
 ...egionStrategyNGAbortPendingCheckpointsTest.java | 47 +++++++++++++++-------
 1 file changed, 32 insertions(+), 15 deletions(-)

diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AdaptedRestartPipelinedRegionStrategyNGAbortPendingCheckpointsTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AdaptedRestartPipelinedRegionStrategyNGAbortPendingCheckpointsTest.java
index 899d490..acfb396 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AdaptedRestartPipelinedRegionStrategyNGAbortPendingCheckpointsTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AdaptedRestartPipelinedRegionStrategyNGAbortPendingCheckpointsTest.java
@@ -30,13 +30,14 @@ import 
org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAda
 import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutor;
 import org.apache.flink.runtime.execution.ExecutionState;
 import 
org.apache.flink.runtime.executiongraph.failover.AdaptedRestartPipelinedRegionStrategyNG;
-import 
org.apache.flink.runtime.executiongraph.restart.InfiniteDelayRestartStrategy;
+import 
org.apache.flink.runtime.executiongraph.restart.FixedDelayRestartStrategy;
 import org.apache.flink.runtime.executiongraph.utils.SimpleSlotProvider;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.ScheduleMode;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import 
org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
+import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
 import org.apache.flink.runtime.state.memory.MemoryStateBackend;
 import org.apache.flink.runtime.taskmanager.TaskExecutionState;
 import org.apache.flink.util.TestLogger;
@@ -51,8 +52,8 @@ import java.util.List;
 
 import static org.apache.flink.util.Preconditions.checkState;
 import static org.hamcrest.Matchers.empty;
-import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertThat;
 
 /**
@@ -76,27 +77,41 @@ public class 
AdaptedRestartPipelinedRegionStrategyNGAbortPendingCheckpointsTest
                final ExecutionGraph executionGraph = 
createExecutionGraph(jobGraph);
 
                final Iterator<ExecutionVertex> vertexIterator = 
executionGraph.getAllExecutionVertices().iterator();
-               final ExecutionVertex onlyExecutionVertex = 
vertexIterator.next();
+               final ExecutionVertex firstExecutionVertex = 
vertexIterator.next();
 
-               setTaskRunning(executionGraph, onlyExecutionVertex);
+               setTasksRunning(executionGraph, firstExecutionVertex, 
vertexIterator.next());
 
                final CheckpointCoordinator checkpointCoordinator = 
executionGraph.getCheckpointCoordinator();
                checkState(checkpointCoordinator != null);
 
                
checkpointCoordinator.triggerCheckpoint(System.currentTimeMillis(),  false);
-               final int pendingCheckpointsBeforeFailure = 
checkpointCoordinator.getNumberOfPendingCheckpoints();
+               assertEquals(1, 
checkpointCoordinator.getNumberOfPendingCheckpoints());
+               long checkpointId = 
checkpointCoordinator.getPendingCheckpoints().keySet().iterator().next();
 
-               failVertex(onlyExecutionVertex);
+               AcknowledgeCheckpoint acknowledgeCheckpoint = new 
AcknowledgeCheckpoint(
+                       jobGraph.getJobID(),
+                       
firstExecutionVertex.getCurrentExecutionAttempt().getAttemptId(),
+                       checkpointId);
+
+               // let the first vertex acknowledge the checkpoint, and fail it 
afterwards
+               // the failover strategy should then cancel all pending 
checkpoints on restart
+               
checkpointCoordinator.receiveAcknowledgeMessage(acknowledgeCheckpoint, "Unknown 
location");
+               assertEquals(1, 
checkpointCoordinator.getNumberOfPendingCheckpoints());
+
+               failVertex(firstExecutionVertex);
+               assertEquals(1, 
checkpointCoordinator.getNumberOfPendingCheckpoints());
+               manualMainThreadExecutor.triggerScheduledTasks();
 
-               assertThat(pendingCheckpointsBeforeFailure, is(equalTo(1)));
                assertNoPendingCheckpoints(checkpointCoordinator);
        }
 
-       private void setTaskRunning(final ExecutionGraph executionGraph, final 
ExecutionVertex executionVertex) {
-               executionGraph.updateState(
-                       new TaskExecutionState(executionGraph.getJobID(),
-                               
executionVertex.getCurrentExecutionAttempt().getAttemptId(),
-                               ExecutionState.RUNNING));
+       private void setTasksRunning(final ExecutionGraph executionGraph, final 
ExecutionVertex... executionVertices) {
+               for (ExecutionVertex executionVertex : executionVertices) {
+                       executionGraph.updateState(
+                               new 
TaskExecutionState(executionGraph.getJobID(),
+                                       
executionVertex.getCurrentExecutionAttempt().getAttemptId(),
+                                       ExecutionState.RUNNING));
+               }
        }
 
        private void failVertex(final ExecutionVertex onlyExecutionVertex) {
@@ -106,9 +121,11 @@ public class 
AdaptedRestartPipelinedRegionStrategyNGAbortPendingCheckpointsTest
 
        private static JobGraph createStreamingJobGraph() {
                final JobVertex v1 = new JobVertex("vertex1");
+               final JobVertex v2 = new JobVertex("vertex2");
                v1.setInvokableClass(AbstractInvokable.class);
+               v2.setInvokableClass(AbstractInvokable.class);
 
-               final JobGraph jobGraph = new JobGraph(v1);
+               final JobGraph jobGraph = new JobGraph(v1, v2);
                jobGraph.setScheduleMode(ScheduleMode.EAGER);
 
                return jobGraph;
@@ -116,9 +133,9 @@ public class 
AdaptedRestartPipelinedRegionStrategyNGAbortPendingCheckpointsTest
 
        private ExecutionGraph createExecutionGraph(final JobGraph jobGraph) 
throws Exception {
                final ExecutionGraph executionGraph = new 
ExecutionGraphTestUtils.TestingExecutionGraphBuilder(jobGraph)
-                       .setRestartStrategy(new 
InfiniteDelayRestartStrategy(10))
+                       .setRestartStrategy(new FixedDelayRestartStrategy(10, 
0))
                        
.setFailoverStrategyFactory(AdaptedRestartPipelinedRegionStrategyNG::new)
-                       .setSlotProvider(new 
SimpleSlotProvider(jobGraph.getJobID(), 1))
+                       .setSlotProvider(new 
SimpleSlotProvider(jobGraph.getJobID(), 2))
                        .build();
 
                enableCheckpointing(executionGraph);

Reply via email to