Repository: flink Updated Branches: refs/heads/master 6590a4c36 -> 087000751
[FLINK-4314] [tests] Fix test instability in JobManagerHAJobGraphRecoveryITCase The test was relying on the JobManager shutting down before the TaskManager, which is not necessarily the case. If the TaskManager shuts down before the JobManager, the JobGraph could reach the final state FAILED, in which case all HA state is removed. To circumvent this, we add a restart strategy. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/08700075 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/08700075 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/08700075 Branch: refs/heads/master Commit: 087000751bbb08cd90cd9bb10577ddc4739f9628 Parents: 6590a4c Author: Ufuk Celebi <u...@apache.org> Authored: Fri Aug 5 11:49:42 2016 +0200 Committer: Ufuk Celebi <u...@apache.org> Committed: Fri Aug 5 11:51:51 2016 +0200 ---------------------------------------------------------------------- .../test/recovery/JobManagerHAJobGraphRecoveryITCase.java | 9 +++++++++ 1 file changed, 9 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/08700075/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAJobGraphRecoveryITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAJobGraphRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAJobGraphRecoveryITCase.java index 5a10604..eccf971 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAJobGraphRecoveryITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAJobGraphRecoveryITCase.java @@ -25,6 +25,8 @@ import akka.actor.UntypedActor; import akka.testkit.TestActorRef; import org.apache.commons.io.FileUtils; import org.apache.commons.io.filefilter.TrueFileFilter; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.restartstrategy.RestartStrategies; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.akka.AkkaUtils; @@ -140,6 +142,13 @@ public class JobManagerHAJobGraphRecoveryITCase extends TestLogger { JobGraph jobGraph = createBlockingJobGraph(); + // Set restart strategy to guard against shut down races. + // If the TM fails before the JM, it might happen that the + // Job is failed, leading to state removal. + ExecutionConfig ec = new ExecutionConfig(); + ec.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 100)); + jobGraph.setExecutionConfig(ec); + ActorGateway jobManager = flink.getLeaderGateway(deadline.timeLeft()); // Submit the job