[FLINK-6293] [tests] Harden JobManagerITCase One of the unit tests in JobManagerITCase starts a MiniCluster and sends a LeaderSessionMessage to the JobManager without waiting until the JobManager has gained leadership. This can lead to a dropped TriggerSavepoint message which will cause the test to deadlock.
This PR fixes the problem by explicitly waiting for the JobManager to become the leader. This closes #3796. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f3da8f69 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f3da8f69 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f3da8f69 Branch: refs/heads/table-retraction Commit: f3da8f69e99be49068ab4ea3abc5e1c4eba7bf32 Parents: 7c35dc0 Author: Till Rohrmann <trohrm...@apache.org> Authored: Fri Apr 28 10:04:57 2017 +0200 Committer: Till Rohrmann <trohrm...@apache.org> Committed: Fri Apr 28 15:25:44 2017 +0200 ---------------------------------------------------------------------- .../org/apache/flink/runtime/jobmanager/JobManagerITCase.scala | 5 +++++ 1 file changed, 5 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/f3da8f69/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala index ce8517e..5fb9ddf 100644 --- a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala +++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala @@ -765,6 +765,11 @@ class JobManagerITCase(_system: ActorSystem) val jobManager = flinkCluster .getLeaderGateway(deadline.timeLeft) + // we have to make sure that the job manager knows also that he is the leader + // in case of standalone leader retrieval this can happen after the getLeaderGateway call + val leaderFuture = jobManager.ask(NotifyWhenLeader, timeout.duration) + Await.ready(leaderFuture, timeout.duration) + val jobId = new JobID() // Trigger savepoint for non-existing job