Repository: flink
Updated Branches:
  refs/heads/master 6590a4c36 -> 087000751


[FLINK-4314] [tests] Fix test instability in JobManagerHAJobGraphRecoveryITCase

The test was relying on the JobManager shutting down before the
TaskManager, which is not necessarily the case. If the TaskManager
shuts down before the JobManager, the JobGraph could reach the final
state FAILED, in which case all HA state is removed.

To circumvent this, we add a restart strategy.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/08700075
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/08700075
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/08700075

Branch: refs/heads/master
Commit: 087000751bbb08cd90cd9bb10577ddc4739f9628
Parents: 6590a4c
Author: Ufuk Celebi <u...@apache.org>
Authored: Fri Aug 5 11:49:42 2016 +0200
Committer: Ufuk Celebi <u...@apache.org>
Committed: Fri Aug 5 11:51:51 2016 +0200

----------------------------------------------------------------------
 .../test/recovery/JobManagerHAJobGraphRecoveryITCase.java   | 9 +++++++++
 1 file changed, 9 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/08700075/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAJobGraphRecoveryITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAJobGraphRecoveryITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAJobGraphRecoveryITCase.java
index 5a10604..eccf971 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAJobGraphRecoveryITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAJobGraphRecoveryITCase.java
@@ -25,6 +25,8 @@ import akka.actor.UntypedActor;
 import akka.testkit.TestActorRef;
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.io.filefilter.TrueFileFilter;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.akka.AkkaUtils;
@@ -140,6 +142,13 @@ public class JobManagerHAJobGraphRecoveryITCase extends 
TestLogger {
 
                        JobGraph jobGraph = createBlockingJobGraph();
 
+                       // Set restart strategy to guard against shut down 
races.
+                       // If the TM fails before the JM, it might happen that 
the
+                       // Job is failed, leading to state removal.
+                       ExecutionConfig ec = new ExecutionConfig();
+                       
ec.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 
100));
+                       jobGraph.setExecutionConfig(ec);
+
                        ActorGateway jobManager = 
flink.getLeaderGateway(deadline.timeLeft());
 
                        // Submit the job

Reply via email to