Repository: flink Updated Branches: refs/heads/master 1ac3e05f9 -> 8198967ea
[FLINK-7960] [tests] Fix race conditions in ExecutionGraphRestartTest#completeCancellingForAllVertices One race condition is between waitUntilJobStatus(eg, JobStatus.FAILING, 1000) and the subsequent completeCancellingForAllVertices where not all execution are in state CANCELLING. The other race condition is between completeCancellingForAllVertices and the fixed delay restart without a delay. The problem is that the 10th task could have failed. In order to restart we would have to complete the cancel for the first 9 tasks. This is enough for the restart strategy to restart the job. If this happens before completeCancellingForAllVertices has also cancelled the execution of the 10th task, it could happen that we cancel a fresh execution. [hotfix] Make WaitForTasks using an AtomicInteger [hotfix] Set optCancelCondition to Optional.empty() in SimpleAckingTaskManagerGateway Add assertion message to ExecutionGraphTestUtils#switchToRunning This closes #4933. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f9b475f5 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f9b475f5 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f9b475f5 Branch: refs/heads/master Commit: f9b475f5528ed6a85e27ad167086f98c1121ae20 Parents: 1ac3e05 Author: Till Rohrmann <[email protected]> Authored: Wed Nov 1 16:53:14 2017 +0100 Committer: Till Rohrmann <[email protected]> Committed: Thu Nov 2 15:01:37 2017 +0100 ---------------------------------------------------------------------- .../flink/runtime/executiongraph/Execution.java | 2 +- .../ExecutionGraphRestartTest.java | 21 +++++++++++++++----- .../ExecutionGraphSuspendTest.java | 3 ++- .../executiongraph/ExecutionGraphTestUtils.java | 3 ++- .../ExecutionGraphVariousFailuesTest.java | 3 ++- .../ExecutionVertexCancelTest.java | 3 ++- .../executiongraph/GlobalModVersionTest.java | 3 ++- .../PipelinedRegionFailoverConcurrencyTest.java | 3 ++- .../utils/SimpleAckingTaskManagerGateway.java | 8 ++++++++ 9 files changed, 37 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/f9b475f5/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java index 939c290..c1f423b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java @@ -844,7 +844,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution // failing in the meantime may happen and is no problem. // anything else is a serious problem !!! if (current != FAILED) { - String message = String.format("Asynchronous race: Found state %s after successful cancel call.", state); + String message = String.format("Asynchronous race: Found %s in state %s after successful cancel call.", vertex.getTaskNameWithSubtaskIndex(), state); LOG.error(message); vertex.getExecutionGraph().failGlobal(new Exception(message)); } http://git-wip-us.apache.org/repos/asf/flink/blob/f9b475f5/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java index acf854f..8770b06 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java @@ -76,6 +76,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Consumer; import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.SimpleActorGateway; @@ -589,15 +590,19 @@ public class ExecutionGraphRestartTest extends TestLogger { public void testConcurrentLocalFailAndRestart() throws Exception { final int parallelism = 10; SimpleAckingTaskManagerGateway taskManagerGateway = new SimpleAckingTaskManagerGateway(); + final OneShotLatch restartLatch = new OneShotLatch(); + final TriggeredRestartStrategy triggeredRestartStrategy = new TriggeredRestartStrategy(restartLatch); final ExecutionGraph eg = createSimpleTestGraph( new JobID(), taskManagerGateway, - new FixedDelayRestartStrategy(10, 0L), + triggeredRestartStrategy, createNoOpVertex(parallelism)); WaitForTasks waitForTasks = new WaitForTasks(parallelism); + WaitForTasks waitForTasksCancelled = new WaitForTasks(parallelism); taskManagerGateway.setCondition(waitForTasks); + taskManagerGateway.setCancelCondition(waitForTasksCancelled); eg.setScheduleMode(ScheduleMode.EAGER); eg.scheduleForExecution(); @@ -648,8 +653,15 @@ public class ExecutionGraphRestartTest extends TestLogger { WaitForTasks waitForTasksAfterRestart = new WaitForTasks(parallelism); taskManagerGateway.setCondition(waitForTasksAfterRestart); + waitForTasksCancelled.getFuture().get(1000L, TimeUnit.MILLISECONDS); + completeCancellingForAllVertices(eg); + // block the restart until we have completed for all vertices the cancellation + // otherwise it might happen that the last vertex which failed will have a new + // execution set due to restart which is wrongly canceled + restartLatch.trigger(); + waitUntilJobStatus(eg, JobStatus.RUNNING, 1000); waitForTasksAfterRestart.getFuture().get(1000, TimeUnit.MILLISECONDS); @@ -1048,11 +1060,12 @@ public class ExecutionGraphRestartTest extends TestLogger { private final int tasksToWaitFor; private final CompletableFuture<Boolean> allTasksReceived; - private int counter; + private final AtomicInteger counter; public WaitForTasks(int tasksToWaitFor) { this.tasksToWaitFor = tasksToWaitFor; this.allTasksReceived = new CompletableFuture<>(); + this.counter = new AtomicInteger(); } public CompletableFuture<Boolean> getFuture() { @@ -1061,9 +1074,7 @@ public class ExecutionGraphRestartTest extends TestLogger { @Override public void accept(ExecutionAttemptID executionAttemptID) { - counter++; - - if (counter >= tasksToWaitFor) { + if (counter.incrementAndGet() >= tasksToWaitFor) { allTasksReceived.complete(true); } } http://git-wip-us.apache.org/repos/asf/flink/blob/f9b475f5/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSuspendTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSuspendTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSuspendTest.java index b3a8c33..f0adc32 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSuspendTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSuspendTest.java @@ -30,6 +30,7 @@ import org.apache.flink.runtime.jobgraph.JobStatus; import org.apache.flink.runtime.jobgraph.JobVertex; import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway; import org.apache.flink.runtime.testtasks.NoOpInvokable; +import org.apache.flink.util.TestLogger; import org.junit.Test; @@ -45,7 +46,7 @@ import static org.mockito.Mockito.verifyNoMoreInteractions; /** * Validates that suspending out of various states works correctly. */ -public class ExecutionGraphSuspendTest { +public class ExecutionGraphSuspendTest extends TestLogger { /** * Going into SUSPENDED out of CREATED should immediately cancel everything and http://git-wip-us.apache.org/repos/asf/flink/blob/f9b475f5/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java index 017e85f..b534ade 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java @@ -202,7 +202,8 @@ public class ExecutionGraphTestUtils { // check that all execution are in state DEPLOYING for (ExecutionVertex ev : eg.getAllExecutionVertices()) { final Execution exec = ev.getCurrentExecutionAttempt(); - assert(exec.getState() == ExecutionState.DEPLOYING); + final ExecutionState executionState = exec.getState(); + assert executionState == ExecutionState.DEPLOYING : "Expected executionState to be DEPLOYING, was: " + executionState; } // switch executions to RUNNING http://git-wip-us.apache.org/repos/asf/flink/blob/f9b475f5/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphVariousFailuesTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphVariousFailuesTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphVariousFailuesTest.java index 0797ef9..e23d495 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphVariousFailuesTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphVariousFailuesTest.java @@ -23,6 +23,7 @@ import org.apache.flink.runtime.executiongraph.restart.InfiniteDelayRestartStrat import org.apache.flink.runtime.io.network.partition.ResultPartitionID; import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID; import org.apache.flink.runtime.jobgraph.JobStatus; +import org.apache.flink.util.TestLogger; import org.junit.Test; @@ -30,7 +31,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.fail; -public class ExecutionGraphVariousFailuesTest { +public class ExecutionGraphVariousFailuesTest extends TestLogger { /** * Test that failing in state restarting will retrigger the restarting logic. This means that http://git-wip-us.apache.org/repos/asf/flink/blob/f9b475f5/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java index fe6c3cd..44e1794 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java @@ -41,13 +41,14 @@ import org.apache.flink.runtime.messages.TaskMessages.SubmitTask; import org.apache.flink.runtime.messages.TaskMessages.CancelTask; import org.apache.flink.runtime.testingUtils.TestingUtils; import org.apache.flink.runtime.testutils.DirectScheduledExecutorService; +import org.apache.flink.util.TestLogger; import org.junit.Test; import scala.concurrent.ExecutionContext; @SuppressWarnings("serial") -public class ExecutionVertexCancelTest { +public class ExecutionVertexCancelTest extends TestLogger { // -------------------------------------------------------------------------------------------- // Canceling in different states http://git-wip-us.apache.org/repos/asf/flink/blob/f9b475f5/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/GlobalModVersionTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/GlobalModVersionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/GlobalModVersionTest.java index d8f0309..534c33d 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/GlobalModVersionTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/GlobalModVersionTest.java @@ -30,6 +30,7 @@ import org.apache.flink.runtime.jobgraph.JobStatus; import org.apache.flink.runtime.jobgraph.JobVertex; import org.apache.flink.runtime.testingUtils.TestingUtils; import org.apache.flink.runtime.testtasks.NoOpInvokable; +import org.apache.flink.util.TestLogger; import org.junit.Test; @@ -42,7 +43,7 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; -public class GlobalModVersionTest { +public class GlobalModVersionTest extends TestLogger { /** * Tests that failures during a global cancellation are not handed to the local http://git-wip-us.apache.org/repos/asf/flink/blob/f9b475f5/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PipelinedRegionFailoverConcurrencyTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PipelinedRegionFailoverConcurrencyTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PipelinedRegionFailoverConcurrencyTest.java index ac34c62..124a5b7 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PipelinedRegionFailoverConcurrencyTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PipelinedRegionFailoverConcurrencyTest.java @@ -36,6 +36,7 @@ import org.apache.flink.runtime.jobgraph.JobStatus; import org.apache.flink.runtime.jobgraph.JobVertex; import org.apache.flink.runtime.testingUtils.TestingUtils; import org.apache.flink.runtime.testtasks.NoOpInvokable; +import org.apache.flink.util.TestLogger; import org.junit.Test; @@ -53,7 +54,7 @@ import static org.junit.Assert.assertTrue; * <p>This test must be in the package it resides in, because it uses package-private methods * from the ExecutionGraph classes. */ -public class PipelinedRegionFailoverConcurrencyTest { +public class PipelinedRegionFailoverConcurrencyTest extends TestLogger { /** * Tests that a cancellation concurrent to a local failover leads to a properly http://git-wip-us.apache.org/repos/asf/flink/blob/f9b475f5/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleAckingTaskManagerGateway.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleAckingTaskManagerGateway.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleAckingTaskManagerGateway.java index 501bedd..682705a 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleAckingTaskManagerGateway.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleAckingTaskManagerGateway.java @@ -48,14 +48,21 @@ public class SimpleAckingTaskManagerGateway implements TaskManagerGateway { private Optional<Consumer<ExecutionAttemptID>> optSubmitCondition; + private Optional<Consumer<ExecutionAttemptID>> optCancelCondition; + public SimpleAckingTaskManagerGateway() { optSubmitCondition = Optional.empty(); + optCancelCondition = Optional.empty(); } public void setCondition(Consumer<ExecutionAttemptID> predicate) { optSubmitCondition = Optional.of(predicate); } + public void setCancelCondition(Consumer<ExecutionAttemptID> predicate) { + optCancelCondition = Optional.of(predicate); + } + @Override public String getAddress() { return address; @@ -96,6 +103,7 @@ public class SimpleAckingTaskManagerGateway implements TaskManagerGateway { @Override public CompletableFuture<Acknowledge> cancelTask(ExecutionAttemptID executionAttemptID, Time timeout) { + optCancelCondition.ifPresent(condition -> condition.accept(executionAttemptID)); return CompletableFuture.completedFuture(Acknowledge.get()); }
