[FLINK-6134] Set UUID(0L, 0L) as default leader session id

Before the default leader session id was null in the standalone case. However, 
in the ZooKeeper
case null indicated that there was no active leader available. With this 
commit, the default
leader id will be set to UUID(0L, 0L). This allows the uniform treatment of 
null denoting that
there is no active leader across the standalone and the ZooKeeper case.

With this change, the FlinkActors will now ignore all LeaderSessionMessages if 
the actors's
leader id field is null. This indicates that the FlinkActor does not know the 
current leader.

This closes #3578.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2dfd463e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2dfd463e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2dfd463e

Branch: refs/heads/master
Commit: 2dfd463e2a3ca2cc1428753b51a980f181a468a9
Parents: 034d9a3
Author: Till Rohrmann <[email protected]>
Authored: Mon Mar 20 18:15:57 2017 +0100
Committer: Till Rohrmann <[email protected]>
Committed: Wed Mar 22 10:26:40 2017 +0100

----------------------------------------------------------------------
 .../apache/flink/client/program/ClientTest.java | 37 ++++----
 .../MesosFlinkResourceManagerTest.java          |  3 +-
 .../BackPressureStatsTrackerITCase.java         |  3 +-
 .../StackTraceSampleCoordinatorITCase.java      |  3 +-
 .../flink/runtime/akka/FlinkUntypedActor.java   | 14 ++-
 .../flink/runtime/client/JobClientActor.java    | 98 ++++++++++++++------
 .../clusterframework/FlinkResourceManager.java  |  4 +-
 .../runtime/instance/AkkaActorGateway.java      |  5 +-
 .../StandaloneLeaderElectionService.java        |  3 +-
 .../StandaloneLeaderRetrievalService.java       |  4 +-
 .../runtime/LeaderSessionMessageFilter.scala    | 21 +++--
 .../runtime/messages/JobClientMessages.scala    |  9 +-
 .../runtime/messages/JobManagerMessages.scala   |  2 +-
 .../flink/runtime/taskmanager/TaskManager.scala |  5 +-
 .../runtime/client/JobClientActorTest.java      | 29 +++---
 .../jobmanager/JobManagerHARecoveryTest.java    |  5 +-
 .../runtime/jobmanager/JobManagerTest.java      | 41 ++++----
 .../flink/runtime/jobmaster/JobMasterTest.java  |  4 +-
 .../LeaderElectionRetrievalTestingCluster.java  |  8 +-
 .../StandaloneLeaderElectionTest.java           |  5 +-
 .../TestingLeaderRetrievalService.java          |  6 +-
 .../runtime/metrics/TaskManagerMetricsTest.java |  7 +-
 .../AkkaKvStateLocationLookupServiceTest.java   | 24 +++--
 .../resourcemanager/JobLeaderIdServiceTest.java | 15 ++-
 .../ResourceManagerJobMasterTest.java           | 13 ++-
 .../taskexecutor/TaskExecutorITCase.java        |  2 +-
 .../runtime/taskexecutor/TaskExecutorTest.java  | 24 +++--
 .../TaskManagerRegistrationTest.java            |  3 +-
 .../runtime/taskmanager/TaskManagerTest.java    |  7 +-
 .../src/test/resources/log4j-test.properties    |  2 +-
 .../jobmanager/JobManagerRegistrationTest.scala | 24 +++--
 .../testingUtils/ScalaTestingUtils.scala        |  3 +-
 .../runtime/testingUtils/TestingUtils.scala     | 15 +--
 .../JobManagerHAJobGraphRecoveryITCase.java     | 88 ------------------
 .../LocalFlinkMiniClusterITCase.java            |  3 +-
 .../java/org/apache/flink/yarn/UtilsTest.java   |  4 +-
 36 files changed, 292 insertions(+), 251 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/2dfd463e/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
----------------------------------------------------------------------
diff --git 
a/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java 
b/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
index 96785f4..75cb0e7 100644
--- 
a/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
+++ 
b/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
@@ -42,11 +42,13 @@ import 
org.apache.flink.optimizer.plandump.PlanJSONDumpGenerator;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.akka.FlinkUntypedActor;
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.jobmanager.JobManager;
 import org.apache.flink.runtime.messages.JobManagerMessages;
 import org.apache.flink.runtime.util.SerializedThrowable;
 import org.apache.flink.util.NetUtils;
 
+import org.apache.flink.util.TestLogger;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -54,6 +56,7 @@ import org.junit.Test;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 
+import java.io.IOException;
 import java.net.URL;
 import java.util.Collections;
 import java.util.UUID;
@@ -68,7 +71,7 @@ import static org.mockito.Mockito.when;
 /**
  * Simple and maybe stupid test to check the {@link ClusterClient} class.
  */
-public class ClientTest {
+public class ClientTest extends TestLogger {
 
        private PackagedProgram program;
 
@@ -217,27 +220,21 @@ public class ClientTest {
         * This test verifies correct that the correct exception is thrown when 
the job submission fails.
         */
        @Test
-       public void shouldSubmitToJobClientFails() {
-               try {
-                       
jobManagerSystem.actorOf(Props.create(FailureReturningActor.class), 
JobManager.JOB_MANAGER_NAME());
+       public void shouldSubmitToJobClientFails() throws IOException {
+               
jobManagerSystem.actorOf(Props.create(FailureReturningActor.class), 
JobManager.JOB_MANAGER_NAME());
 
-                       ClusterClient out = new StandaloneClusterClient(config);
-                       out.setDetached(true);
+               ClusterClient out = new StandaloneClusterClient(config);
+               out.setDetached(true);
 
-                       try {
-                               out.run(program.getPlanWithJars(), 1);
-                               fail("This should fail with an exception");
-                       }
-                       catch (ProgramInvocationException e) {
-                               // bam!
-                       }
-                       catch (Exception e) {
-                               fail("wrong exception " + e);
-                       }
+               try {
+                       out.run(program.getPlanWithJars(), 1);
+                       fail("This should fail with an exception");
+               }
+               catch (ProgramInvocationException e) {
+                       // bam!
                }
                catch (Exception e) {
-                       e.printStackTrace();
-                       fail(e.getMessage());
+                       fail("wrong exception " + e);
                }
        }
 
@@ -308,7 +305,7 @@ public class ClientTest {
 
        public static class SuccessReturningActor extends FlinkUntypedActor {
 
-               private UUID leaderSessionID = null;
+               private UUID leaderSessionID = 
HighAvailabilityServices.DEFAULT_LEADER_ID;
 
                @Override
                public void handleMessage(Object message) {
@@ -338,7 +335,7 @@ public class ClientTest {
 
        public static class FailureReturningActor extends FlinkUntypedActor {
 
-               private UUID leaderSessionID = null;
+               private UUID leaderSessionID = 
HighAvailabilityServices.DEFAULT_LEADER_ID;
 
                @Override
                public void handleMessage(Object message) {

http://git-wip-us.apache.org/repos/asf/flink/blob/2dfd463e/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManagerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManagerTest.java
 
b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManagerTest.java
index dcf6a82..c854a17 100644
--- 
a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManagerTest.java
+++ 
b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManagerTest.java
@@ -41,6 +41,7 @@ import 
org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameter
 import org.apache.flink.runtime.clusterframework.ContainerSpecification;
 import org.apache.flink.runtime.clusterframework.messages.*;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.instance.AkkaActorGateway;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
@@ -208,7 +209,7 @@ public class MesosFlinkResourceManagerTest extends 
TestLogger {
                                        TestingMesosFlinkResourceManager.class,
                                        config, mesosConfig, workerStore, 
retrievalService, tmParams, containerSpecification, artifactResolver, LOG));
                        resourceManagerInstance = 
resourceManagerRef.underlyingActor();
-                       resourceManager = new 
AkkaActorGateway(resourceManagerRef, null);
+                       resourceManager = new 
AkkaActorGateway(resourceManagerRef, 
HighAvailabilityServices.DEFAULT_LEADER_ID);
 
                        verify(schedulerDriver).start();
                        
resourceManagerInstance.connectionMonitor.expectMsgClass(ConnectionMonitor.Start.class);

http://git-wip-us.apache.org/repos/asf/flink/blob/2dfd463e/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTrackerITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTrackerITCase.java
 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTrackerITCase.java
index 46f8be6..e80c509 100644
--- 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTrackerITCase.java
+++ 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTrackerITCase.java
@@ -28,6 +28,7 @@ import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.client.JobClient;
 import org.apache.flink.runtime.executiongraph.ExecutionGraph;
 import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.instance.AkkaActorGateway;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
@@ -138,7 +139,7 @@ public class BackPressureStatsTrackerITCase extends 
TestLogger {
                                        @Override
                                        protected void run() {
                                                try {
-                                                       ActorGateway testActor 
= new AkkaActorGateway(getTestActor(), null);
+                                                       ActorGateway testActor 
= new AkkaActorGateway(getTestActor(), 
HighAvailabilityServices.DEFAULT_LEADER_ID);
 
                                                        // Submit the job and 
wait until it is running
                                                        
JobClient.submitJobDetached(

http://git-wip-us.apache.org/repos/asf/flink/blob/2dfd463e/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinatorITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinatorITCase.java
 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinatorITCase.java
index a44e212..5463384 100644
--- 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinatorITCase.java
+++ 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinatorITCase.java
@@ -28,6 +28,7 @@ import org.apache.flink.runtime.client.JobClient;
 import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.runtime.executiongraph.ExecutionGraph;
 import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.instance.AkkaActorGateway;
 import org.apache.flink.runtime.jobgraph.JobGraph;
@@ -110,7 +111,7 @@ public class StackTraceSampleCoordinatorITCase extends 
TestLogger {
                                        @Override
                                        protected void run() {
                                                try {
-                                                       ActorGateway testActor 
= new AkkaActorGateway(getTestActor(), null);
+                                                       ActorGateway testActor 
= new AkkaActorGateway(getTestActor(), 
HighAvailabilityServices.DEFAULT_LEADER_ID);
 
                                                        int maxAttempts = 10;
                                                        int sleepTime = 100;

http://git-wip-us.apache.org/repos/asf/flink/blob/2dfd463e/flink-runtime/src/main/java/org/apache/flink/runtime/akka/FlinkUntypedActor.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/akka/FlinkUntypedActor.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/akka/FlinkUntypedActor.java
index 3255778..05ae501 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/akka/FlinkUntypedActor.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/akka/FlinkUntypedActor.java
@@ -84,10 +84,14 @@ public abstract class FlinkUntypedActor extends 
UntypedActor {
                        UUID expectedID = getLeaderSessionID();
                        UUID actualID = msg.leaderSessionID();
 
-                       if(expectedID == actualID || (expectedID != null && 
expectedID.equals(actualID))) {
-                               handleMessage(msg.message());
+                       if (expectedID != null) {
+                               if (expectedID.equals(actualID)) {
+                                       handleMessage(msg.message());
+                               } else {
+                                       handleDiscardedMessage(expectedID, msg);
+                               }
                        } else {
-                               handleDiscardedMessage(expectedID, msg);
+                               handleNoLeaderId(msg);
                        }
                } else if (message instanceof RequiresLeaderSessionID) {
                        throw new Exception("Received a message " + message + " 
without a leader session " +
@@ -104,6 +108,10 @@ public abstract class FlinkUntypedActor extends 
UntypedActor {
                                msg.leaderSessionID());
        }
 
+       private void handleNoLeaderId(LeaderSessionMessage msg) {
+               LOG.warn("Discard message {} because there is currently no 
valid leader id known.", msg);
+       }
+
        /**
         * This method contains the actor logic which defines how to react to 
incoming messages.
         *

http://git-wip-us.apache.org/repos/asf/flink/blob/2dfd463e/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClientActor.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClientActor.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClientActor.java
index 1380e76..368a2b6 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClientActor.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClientActor.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.client;
 
 import akka.actor.ActorRef;
+import akka.actor.Cancellable;
 import akka.actor.PoisonPill;
 import akka.actor.Status;
 import akka.actor.Terminated;
@@ -36,6 +37,7 @@ import org.apache.flink.runtime.messages.JobManagerMessages;
 import org.apache.flink.util.Preconditions;
 import scala.concurrent.duration.FiniteDuration;
 
+import java.util.Objects;
 import java.util.UUID;
 
 
@@ -66,6 +68,10 @@ public abstract class JobClientActor extends 
FlinkUntypedActor implements Leader
        /** The client which the actor is responsible for */
        protected ActorRef client;
 
+       private Cancellable connectionTimeout;
+
+       private UUID connectionTimeoutId;
+
        public JobClientActor(
                        LeaderRetrievalService leaderRetrievalService,
                        FiniteDuration timeout,
@@ -73,6 +79,11 @@ public abstract class JobClientActor extends 
FlinkUntypedActor implements Leader
                this.leaderRetrievalService = 
Preconditions.checkNotNull(leaderRetrievalService);
                this.timeout = Preconditions.checkNotNull(timeout);
                this.sysoutUpdates = sysoutUpdates;
+               this.jobManager = ActorRef.noSender();
+               this.leaderSessionID = null;
+
+               connectionTimeout = null;
+               connectionTimeoutId = null;
        }
 
        @Override
@@ -146,6 +157,9 @@ public abstract class JobClientActor extends 
FlinkUntypedActor implements Leader
                                                        
getSelf().tell(decorateMessage(new JobManagerActorRef(result)), 
ActorRef.noSender());
                                                }
                                        }, getContext().dispatcher());
+                       } else if (isClientConnected() && connectionTimeoutId 
== null) {
+                               // msg.address == null means that the leader 
has lost its leadership
+                               registerConnectionTimeout();
                        }
                } else if (message instanceof JobManagerActorRef) {
                        // Resolved JobManager ActorRef
@@ -184,36 +198,35 @@ public abstract class JobClientActor extends 
FlinkUntypedActor implements Leader
                                        jobManager.path());
                                disconnectFromJobManager();
 
-                               // we only issue a connection timeout if we 
have submitted a job before
-                               // otherwise, we might have some more time to 
find another job manager
-                               // Important: The ConnectionTimeout message is 
filtered out in case that we are
-                               // notified about a new leader by setting the 
new leader session ID, because
-                               // ConnectionTimeout extends 
RequiresLeaderSessionID
                                if (isClientConnected()) {
-                                       
getContext().system().scheduler().scheduleOnce(
-                                               timeout,
-                                               getSelf(),
-                                               
decorateMessage(JobClientMessages.getConnectionTimeout()),
-                                               getContext().dispatcher(),
-                                               ActorRef.noSender());
+                                       if (connectionTimeoutId == null) {
+                                               // only register a connection 
timeout if we haven't done this before
+                                               registerConnectionTimeout();
+                                       }
                                }
                        } else {
                                LOG.warn("Received 'Terminated' for unknown 
actor " + target);
                        }
                }
-               else if 
(JobClientMessages.getConnectionTimeout().equals(message)) {
-                       // check if we haven't found a job manager yet
-                       if (!isJobManagerConnected()) {
-                               final JobClientActorConnectionTimeoutException 
errorMessage =
-                                       new 
JobClientActorConnectionTimeoutException("Lost connection to the JobManager.");
-                               final Object replyMessage = decorateMessage(new 
Status.Failure(errorMessage));
-                               if (isClientConnected()) {
-                                       client.tell(
-                                               replyMessage,
-                                               getSelf());
+               else if (message instanceof 
JobClientMessages.ConnectionTimeout) {
+                       JobClientMessages.ConnectionTimeout timeoutMessage = 
(JobClientMessages.ConnectionTimeout) message;
+
+                       if (Objects.equals(connectionTimeoutId, 
timeoutMessage.id())) {
+                               // check if we haven't found a job manager yet
+                               if (!isJobManagerConnected()) {
+                                       final 
JobClientActorConnectionTimeoutException errorMessage =
+                                               new 
JobClientActorConnectionTimeoutException("Lost connection to the JobManager.");
+                                       final Object replyMessage = 
decorateMessage(new Status.Failure(errorMessage));
+                                       if (isClientConnected()) {
+                                               client.tell(
+                                                       replyMessage,
+                                                       getSelf());
+                                       }
+                                       // Connection timeout reached, let's 
terminate
+                                       terminate();
                                }
-                               // Connection timeout reached, let's terminate
-                               terminate();
+                       } else {
+                               LOG.debug("Received outdated connection 
timeout.");
                        }
                }
 
@@ -225,13 +238,10 @@ public abstract class JobClientActor extends 
FlinkUntypedActor implements Leader
                                message);
                        // We want to submit/attach to a job, but we haven't 
found a job manager yet.
                        // Let's give him another chance to find a job manager 
within the given timeout.
-                       getContext().system().scheduler().scheduleOnce(
-                               timeout,
-                               getSelf(),
-                               
decorateMessage(JobClientMessages.getConnectionTimeout()),
-                               getContext().dispatcher(),
-                               ActorRef.noSender()
-                       );
+                       if (connectionTimeoutId == null) {
+                               // only register the connection timeout once
+                               registerConnectionTimeout();
+                       }
                        handleCustomMessage(message);
                }
                else {
@@ -304,6 +314,8 @@ public abstract class JobClientActor extends 
FlinkUntypedActor implements Leader
                        getContext().unwatch(jobManager);
                        jobManager = ActorRef.noSender();
                }
+
+               leaderSessionID = null;
        }
 
        private void connectToJobManager(ActorRef jobManager) {
@@ -316,6 +328,8 @@ public abstract class JobClientActor extends 
FlinkUntypedActor implements Leader
 
                this.jobManager = jobManager;
                getContext().watch(jobManager);
+
+               unregisterConnectionTimeout();
        }
 
        protected void terminate() {
@@ -333,4 +347,28 @@ public abstract class JobClientActor extends 
FlinkUntypedActor implements Leader
                return client != ActorRef.noSender();
        }
 
+       private void registerConnectionTimeout() {
+               if (connectionTimeout != null) {
+                       connectionTimeout.cancel();
+               }
+
+               connectionTimeoutId = UUID.randomUUID();
+
+               connectionTimeout = 
getContext().system().scheduler().scheduleOnce(
+                       timeout,
+                       getSelf(),
+                       decorateMessage(new 
JobClientMessages.ConnectionTimeout(connectionTimeoutId)),
+                       getContext().dispatcher(),
+                       ActorRef.noSender()
+               );
+       }
+
+       private void unregisterConnectionTimeout() {
+               if (connectionTimeout != null) {
+                       connectionTimeout.cancel();
+                       connectionTimeout = null;
+                       connectionTimeoutId = null;
+               }
+       }
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/2dfd463e/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/FlinkResourceManager.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/FlinkResourceManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/FlinkResourceManager.java
index 911c1f6..77dbad4 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/FlinkResourceManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/FlinkResourceManager.java
@@ -410,9 +410,9 @@ public abstract class FlinkResourceManager<WorkerType 
extends ResourceIDRetrieva
                // disconnect from the current leader (no-op if no leader yet)
                jobManagerLostLeadership();
 
-               // a null leader address means that only a leader disconnect
+               // a null leader session id means that only a leader disconnect
                // happened, without a new leader yet
-               if (leaderAddress != null) {
+               if (leaderSessionID != null && leaderAddress != null) {
                        // the leaderSessionID implicitly filters out success 
and failure messages
                        // that come after leadership changed again
                        this.leaderSessionID = leaderSessionID;

http://git-wip-us.apache.org/repos/asf/flink/blob/2dfd463e/flink-runtime/src/main/java/org/apache/flink/runtime/instance/AkkaActorGateway.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/AkkaActorGateway.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/AkkaActorGateway.java
index adeae03..26b6176 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/AkkaActorGateway.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/AkkaActorGateway.java
@@ -24,6 +24,7 @@ import akka.util.Timeout;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.messages.LeaderSessionMessageDecorator;
 import org.apache.flink.runtime.messages.MessageDecorator;
+import org.apache.flink.util.Preconditions;
 import scala.concurrent.ExecutionContext;
 import scala.concurrent.Future;
 import scala.concurrent.duration.FiniteDuration;
@@ -48,8 +49,8 @@ public class AkkaActorGateway implements ActorGateway, 
Serializable {
        private final MessageDecorator decorator;
 
        public AkkaActorGateway(ActorRef actor, UUID leaderSessionID) {
-               this.actor = actor;
-               this.leaderSessionID = leaderSessionID;
+               this.actor = Preconditions.checkNotNull(actor);
+               this.leaderSessionID = 
Preconditions.checkNotNull(leaderSessionID);
                // we want to wrap RequiresLeaderSessionID messages in a 
LeaderSessionMessage
                this.decorator = new 
LeaderSessionMessageDecorator(leaderSessionID);
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/2dfd463e/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/StandaloneLeaderElectionService.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/StandaloneLeaderElectionService.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/StandaloneLeaderElectionService.java
index 2d36616..a956a5e 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/StandaloneLeaderElectionService.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/StandaloneLeaderElectionService.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.leaderelection;
 
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.util.Preconditions;
 
 import java.util.UUID;
@@ -42,7 +43,7 @@ public class StandaloneLeaderElectionService implements 
LeaderElectionService {
                contender = Preconditions.checkNotNull(newContender);
 
                // directly grant leadership to the given contender
-               contender.grantLeadership(null);
+               
contender.grantLeadership(HighAvailabilityServices.DEFAULT_LEADER_ID);
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/2dfd463e/flink-runtime/src/main/java/org/apache/flink/runtime/leaderretrieval/StandaloneLeaderRetrievalService.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderretrieval/StandaloneLeaderRetrievalService.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderretrieval/StandaloneLeaderRetrievalService.java
index 4ad4646..174e106 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderretrieval/StandaloneLeaderRetrievalService.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderretrieval/StandaloneLeaderRetrievalService.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.runtime.leaderretrieval;
 
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+
 import java.util.UUID;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -54,7 +56,7 @@ public class StandaloneLeaderRetrievalService implements 
LeaderRetrievalService
        @Deprecated
        public StandaloneLeaderRetrievalService(String leaderAddress) {
                this.leaderAddress = checkNotNull(leaderAddress);
-               this.leaderId = null;
+               this.leaderId = HighAvailabilityServices.DEFAULT_LEADER_ID;
        }
 
        /**

http://git-wip-us.apache.org/repos/asf/flink/blob/2dfd463e/flink-runtime/src/main/scala/org/apache/flink/runtime/LeaderSessionMessageFilter.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/LeaderSessionMessageFilter.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/LeaderSessionMessageFilter.scala
index 1fb32ce..1a6be25 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/LeaderSessionMessageFilter.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/LeaderSessionMessageFilter.scala
@@ -32,10 +32,15 @@ trait LeaderSessionMessageFilter extends FlinkActor {
 
   abstract override def receive: Receive = {
     case leaderMessage @ LeaderSessionMessage(msgID, msg) =>
-      if (leaderSessionID.equals(Option(msgID))) {
-        super.receive(msg)
-      } else {
-        handleDiscardedMessage(leaderSessionID, leaderMessage)
+      leaderSessionID match {
+        case Some(leaderId) =>
+          if (leaderId.equals(msgID)) {
+            super.receive(msg)
+          } else {
+            handleDiscardedMessage(leaderId, leaderMessage)
+          }
+        case None =>
+          handleNoLeaderId(leaderMessage)
       }
     case msg: RequiresLeaderSessionID =>
       throw new Exception(s"Received a message $msg without a leader session 
ID, even though" +
@@ -45,12 +50,16 @@ trait LeaderSessionMessageFilter extends FlinkActor {
   }
 
   private def handleDiscardedMessage(
-      expectedLeaderSessionID: Option[UUID],
+      expectedLeaderSessionID: UUID,
       msg: LeaderSessionMessage)
     : Unit = {
     log.warn(s"Discard message $msg because the expected leader session ID " +
       s"$expectedLeaderSessionID did not equal the received leader session ID 
" +
-      s"${Option(msg.leaderSessionID)}.")
+      s"${msg.leaderSessionID}.")
+  }
+
+  private def handleNoLeaderId(msg: LeaderSessionMessage): Unit = {
+    log.warn(s"Discard message $msg because there is currently no valid leader 
id known.")
   }
 
   /** Wrap [[RequiresLeaderSessionID]] messages in a [[LeaderSessionMessage]]

http://git-wip-us.apache.org/repos/asf/flink/blob/2dfd463e/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobClientMessages.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobClientMessages.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobClientMessages.scala
index 1f29e32..7268b0f 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobClientMessages.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobClientMessages.scala
@@ -66,10 +66,13 @@ object JobClientMessages {
   /** Message which is triggered when the JobClient registration at the 
JobManager times out */
   case object RegistrationTimeout extends RequiresLeaderSessionID
 
-  /** Message which is triggered when the connection timeout has been reached. 
*/
-  case object ConnectionTimeout extends RequiresLeaderSessionID
+  /**
+    * Message which is triggered when the connection timeout has been reached.
+    *
+    * @param id Timeout id which identifies the concurrent timeouts
+    */
+  case class ConnectionTimeout(id: UUID)
 
   def getSubmissionTimeout(): AnyRef = SubmissionTimeout
   def getRegistrationTimeout(): AnyRef = RegistrationTimeout
-  def getConnectionTimeout(): AnyRef = ConnectionTimeout
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/2dfd463e/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala
index f3025ab..4db2584 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala
@@ -45,7 +45,7 @@ object JobManagerMessages {
     * [[RequiresLeaderSessionID]] interface and have to be wrapped in a 
[[LeaderSessionMessage]],
     * which also contains the current leader session ID.
     *
-    * @param leaderSessionID Current leader session ID or null, if no leader 
session ID was set
+    * @param leaderSessionID Current leader session ID
     * @param message [[RequiresLeaderSessionID]] message to be wrapped in a 
[[LeaderSessionMessage]]
     */
   case class LeaderSessionMessage(leaderSessionID: UUID, message: Any)

http://git-wip-us.apache.org/repos/asf/flink/blob/2dfd463e/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index 25d5366..2e8a6fa 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -1434,7 +1434,10 @@ class TaskManager(
     this.jobManagerAkkaURL = Option(newJobManagerAkkaURL)
     this.leaderSessionID = Option(leaderSessionID)
 
-    triggerTaskManagerRegistration()
+    if (this.leaderSessionID.isDefined) {
+      // only trigger the registration if we have obtained a valid leader id 
(!= null)
+      triggerTaskManagerRegistration()
+    }
   }
 
   /** Starts the TaskManager's registration process to connect to the 
JobManager.

http://git-wip-us.apache.org/repos/asf/flink/blob/2dfd463e/flink-runtime/src/test/java/org/apache/flink/runtime/client/JobClientActorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/client/JobClientActorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/client/JobClientActorTest.java
index 2e3384f..0ec00df 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/client/JobClientActorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/client/JobClientActorTest.java
@@ -28,6 +28,7 @@ import akka.util.Timeout;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.akka.FlinkUntypedActor;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService;
 import org.apache.flink.runtime.messages.Acknowledge;
@@ -73,7 +74,7 @@ public class JobClientActorTest extends TestLogger {
         */
        @Test(expected=JobClientActorSubmissionTimeoutException.class)
        public void testSubmissionTimeout() throws Exception {
-               FiniteDuration jobClientActorTimeout = new FiniteDuration(5, 
TimeUnit.SECONDS);
+               FiniteDuration jobClientActorTimeout = new FiniteDuration(1L, 
TimeUnit.SECONDS);
                FiniteDuration timeout = jobClientActorTimeout.$times(2);
 
                UUID leaderSessionID = UUID.randomUUID();
@@ -112,7 +113,7 @@ public class JobClientActorTest extends TestLogger {
         */
        @Test(expected=JobClientActorRegistrationTimeoutException.class)
        public void testRegistrationTimeout() throws Exception {
-               FiniteDuration jobClientActorTimeout = new FiniteDuration(5, 
TimeUnit.SECONDS);
+               FiniteDuration jobClientActorTimeout = new FiniteDuration(1L, 
TimeUnit.SECONDS);
                FiniteDuration timeout = jobClientActorTimeout.$times(2);
 
                UUID leaderSessionID = UUID.randomUUID();
@@ -142,17 +143,19 @@ public class JobClientActorTest extends TestLogger {
                Await.result(jobExecutionResult, timeout);
        }
 
-       /** Tests that a {@link 
org.apache.flink.runtime.client.JobClientActorConnectionTimeoutException}
+       /** Tests that a {@link JobClientActorConnectionTimeoutException}
         * is thrown when the JobSubmissionClientActor wants to submit a job 
but has not connected to a JobManager.
         *
         * @throws Exception
         */
        @Test(expected=JobClientActorConnectionTimeoutException.class)
        public void testConnectionTimeoutWithoutJobManagerForSubmission() 
throws Exception {
-               FiniteDuration jobClientActorTimeout = new FiniteDuration(5, 
TimeUnit.SECONDS);
+               FiniteDuration jobClientActorTimeout = new FiniteDuration(1L, 
TimeUnit.SECONDS);
                FiniteDuration timeout = jobClientActorTimeout.$times(2);
 
-               TestingLeaderRetrievalService testingLeaderRetrievalService = 
new TestingLeaderRetrievalService();
+               TestingLeaderRetrievalService testingLeaderRetrievalService = 
new TestingLeaderRetrievalService(
+                       "localhost",
+                       HighAvailabilityServices.DEFAULT_LEADER_ID);
 
                Props jobClientActorProps = 
JobSubmissionClientActor.createActorProps(
                        testingLeaderRetrievalService,
@@ -170,16 +173,18 @@ public class JobClientActorTest extends TestLogger {
                Await.result(jobExecutionResult, timeout);
        }
 
-       /** Tests that a {@link 
org.apache.flink.runtime.client.JobClientActorConnectionTimeoutException}
+       /** Tests that a {@link JobClientActorConnectionTimeoutException}
         * is thrown when the JobAttachmentClientActor attach to a job at the 
JobManager
         * but has not connected to a JobManager.
         */
        @Test(expected=JobClientActorConnectionTimeoutException.class)
        public void testConnectionTimeoutWithoutJobManagerForRegistration() 
throws Exception {
-               FiniteDuration jobClientActorTimeout = new FiniteDuration(5, 
TimeUnit.SECONDS);
+               FiniteDuration jobClientActorTimeout = new FiniteDuration(1L, 
TimeUnit.SECONDS);
                FiniteDuration timeout = jobClientActorTimeout.$times(2);
 
-               TestingLeaderRetrievalService testingLeaderRetrievalService = 
new TestingLeaderRetrievalService();
+               TestingLeaderRetrievalService testingLeaderRetrievalService = 
new TestingLeaderRetrievalService(
+                       "localhost",
+                       HighAvailabilityServices.DEFAULT_LEADER_ID);
 
                Props jobClientActorProps = 
JobAttachmentClientActor.createActorProps(
                        testingLeaderRetrievalService,
@@ -203,7 +208,7 @@ public class JobClientActorTest extends TestLogger {
         */
        @Test(expected=JobClientActorConnectionTimeoutException.class)
        public void testConnectionTimeoutAfterJobSubmission() throws Exception {
-               FiniteDuration jobClientActorTimeout = new FiniteDuration(5, 
TimeUnit.SECONDS);
+               FiniteDuration jobClientActorTimeout = new FiniteDuration(1L, 
TimeUnit.SECONDS);
                FiniteDuration timeout = jobClientActorTimeout.$times(2);
 
                UUID leaderSessionID = UUID.randomUUID();
@@ -245,8 +250,8 @@ public class JobClientActorTest extends TestLogger {
         */
        @Test(expected=JobClientActorConnectionTimeoutException.class)
        public void testConnectionTimeoutAfterJobRegistration() throws 
Exception {
-               FiniteDuration jobClientActorTimeout = new FiniteDuration(5, 
TimeUnit.SECONDS);
-               FiniteDuration timeout = jobClientActorTimeout.$times(2);
+               FiniteDuration jobClientActorTimeout = new FiniteDuration(1L, 
TimeUnit.SECONDS);
+               FiniteDuration timeout = jobClientActorTimeout.$times(2L);
 
                UUID leaderSessionID = UUID.randomUUID();
 
@@ -287,7 +292,7 @@ public class JobClientActorTest extends TestLogger {
         */
        @Test
        public void testGuaranteedAnswerIfJobClientDies() throws Exception {
-               FiniteDuration timeout = new FiniteDuration(2, 
TimeUnit.SECONDS);
+               FiniteDuration timeout = new FiniteDuration(2L, 
TimeUnit.SECONDS);
 
                        UUID leaderSessionID = UUID.randomUUID();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/2dfd463e/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
index 32358c0..2ebc36e 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
@@ -50,6 +50,7 @@ import 
org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
 import 
org.apache.flink.runtime.executiongraph.restart.FixedDelayRestartStrategy;
 import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.instance.AkkaActorGateway;
 import org.apache.flink.runtime.instance.InstanceManager;
@@ -166,7 +167,9 @@ public class JobManagerHARecoveryTest {
                        CheckpointIDCounter checkpointCounter = new 
StandaloneCheckpointIDCounter();
                        CheckpointRecoveryFactory checkpointStateFactory = new 
MyCheckpointRecoveryFactory(checkpointStore, checkpointCounter);
                        TestingLeaderElectionService myLeaderElectionService = 
new TestingLeaderElectionService();
-                       TestingLeaderRetrievalService myLeaderRetrievalService 
= new TestingLeaderRetrievalService();
+                       TestingLeaderRetrievalService myLeaderRetrievalService 
= new TestingLeaderRetrievalService(
+                               "localhost",
+                               HighAvailabilityServices.DEFAULT_LEADER_ID);
 
                        InstanceManager instanceManager = new InstanceManager();
                        instanceManager.addInstanceListener(scheduler);

http://git-wip-us.apache.org/repos/asf/flink/blob/2dfd463e/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
index 727fc65..3944752c 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
@@ -36,6 +36,7 @@ import org.apache.flink.runtime.executiongraph.ExecutionGraph;
 import org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils;
 import org.apache.flink.runtime.executiongraph.ExecutionVertex;
 import org.apache.flink.runtime.executiongraph.IntermediateResultPartition;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.instance.AkkaActorGateway;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
@@ -86,6 +87,7 @@ import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.runtime.testtasks.BlockingNoOpInvokable;
 import org.apache.flink.runtime.testtasks.NoOpInvokable;
 import org.apache.flink.runtime.testutils.StoppableInvokable;
+import org.apache.flink.util.TestLogger;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Rule;
@@ -120,7 +122,7 @@ import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
-public class JobManagerTest {
+public class JobManagerTest extends TestLogger {
 
        @Rule
        public TemporaryFolder tmpFolder = new TemporaryFolder();
@@ -183,7 +185,7 @@ public class JobManagerTest {
                                                        
TestingUtils.TESTING_DURATION());
 
                                                // we can set the leader 
session ID to None because we don't use this gateway to send messages
-                                               final ActorGateway 
testActorGateway = new AkkaActorGateway(getTestActor(), null);
+                                               final ActorGateway 
testActorGateway = new AkkaActorGateway(getTestActor(), 
HighAvailabilityServices.DEFAULT_LEADER_ID);
 
                                                // Submit the job and wait for 
all vertices to be running
                                                jobManagerGateway.tell(
@@ -304,7 +306,7 @@ public class JobManagerTest {
                                                        
TestingUtils.TESTING_DURATION());
 
                                                // we can set the leader 
session ID to None because we don't use this gateway to send messages
-                                               final ActorGateway 
testActorGateway = new AkkaActorGateway(getTestActor(), null);
+                                               final ActorGateway 
testActorGateway = new AkkaActorGateway(getTestActor(), 
HighAvailabilityServices.DEFAULT_LEADER_ID);
 
                                                // Submit the job and wait for 
all vertices to be running
                                                jobManagerGateway.tell(
@@ -395,7 +397,7 @@ public class JobManagerTest {
                                                        
TestingUtils.TESTING_DURATION());
 
                                                // we can set the leader 
session ID to None because we don't use this gateway to send messages
-                                               final ActorGateway 
testActorGateway = new AkkaActorGateway(getTestActor(), null);
+                                               final ActorGateway 
testActorGateway = new AkkaActorGateway(getTestActor(), 
HighAvailabilityServices.DEFAULT_LEADER_ID);
 
                                                // Submit the job and wait for 
all vertices to be running
                                                jobManagerGateway.tell(
@@ -478,7 +480,7 @@ public class JobManagerTest {
                                                final ActorGateway 
jobManagerGateway = cluster.getLeaderGateway(TestingUtils.TESTING_DURATION());
 
                                                // we can set the leader 
session ID to None because we don't use this gateway to send messages
-                                               final ActorGateway 
testActorGateway = new AkkaActorGateway(getTestActor(), null);
+                                               final ActorGateway 
testActorGateway = new AkkaActorGateway(getTestActor(), 
HighAvailabilityServices.DEFAULT_LEADER_ID);
 
                                                // Submit the job and wait for 
all vertices to be running
                                                jobManagerGateway.tell(
@@ -530,7 +532,7 @@ public class JobManagerTest {
                                                final ActorGateway 
jobManagerGateway = cluster.getLeaderGateway(TestingUtils.TESTING_DURATION());
 
                                                // we can set the leader 
session ID to None because we don't use this gateway to send messages
-                                               final ActorGateway 
testActorGateway = new AkkaActorGateway(getTestActor(), null);
+                                               final ActorGateway 
testActorGateway = new AkkaActorGateway(getTestActor(), 
HighAvailabilityServices.DEFAULT_LEADER_ID);
 
                                                // Submit the job and wait for 
all vertices to be running
                                                jobManagerGateway.tell(
@@ -572,7 +574,6 @@ public class JobManagerTest {
                Configuration config = new Configuration();
                config.setString(ConfigConstants.AKKA_ASK_TIMEOUT, "100ms");
 
-               UUID leaderSessionId = null;
                ActorGateway jobManager = new AkkaActorGateway(
                                JobManager.startJobManagerActors(
                                        config,
@@ -581,7 +582,7 @@ public class JobManagerTest {
                                        TestingUtils.defaultExecutor(),
                                        TestingJobManager.class,
                                        MemoryArchivist.class)._1(),
-                               leaderSessionId);
+                               HighAvailabilityServices.DEFAULT_LEADER_ID);
 
                LeaderRetrievalService leaderRetrievalService = new 
StandaloneLeaderRetrievalService(
                                AkkaUtils.getAkkaURL(system, 
jobManager.actor()));
@@ -794,8 +795,8 @@ public class JobManagerTest {
                                TestingJobManager.class,
                                TestingMemoryArchivist.class);
 
-                       jobManager = new AkkaActorGateway(master._1(), null);
-                       archiver = new AkkaActorGateway(master._2(), null);
+                       jobManager = new AkkaActorGateway(master._1(), 
HighAvailabilityServices.DEFAULT_LEADER_ID);
+                       archiver = new AkkaActorGateway(master._2(), 
HighAvailabilityServices.DEFAULT_LEADER_ID);
 
                        ActorRef taskManagerRef = 
TaskManager.startTaskManagerComponentsAndActor(
                                        config,
@@ -807,7 +808,7 @@ public class JobManagerTest {
                                        true,
                                        TestingTaskManager.class);
 
-                       taskManager = new AkkaActorGateway(taskManagerRef, 
null);
+                       taskManager = new AkkaActorGateway(taskManagerRef, 
HighAvailabilityServices.DEFAULT_LEADER_ID);
 
                        // Wait until connected
                        Object msg = new 
TestingTaskManagerMessages.NotifyWhenRegisteredAtJobManager(jobManager.actor());
@@ -920,8 +921,8 @@ public class JobManagerTest {
                                TestingJobManager.class,
                                TestingMemoryArchivist.class);
 
-                       jobManager = new AkkaActorGateway(master._1(), null);
-                       archiver = new AkkaActorGateway(master._2(), null);
+                       jobManager = new AkkaActorGateway(master._1(), 
HighAvailabilityServices.DEFAULT_LEADER_ID);
+                       archiver = new AkkaActorGateway(master._2(), 
HighAvailabilityServices.DEFAULT_LEADER_ID);
 
                        ActorRef taskManagerRef = 
TaskManager.startTaskManagerComponentsAndActor(
                                config,
@@ -933,7 +934,7 @@ public class JobManagerTest {
                                true,
                                TestingTaskManager.class);
 
-                       taskManager = new AkkaActorGateway(taskManagerRef, 
null);
+                       taskManager = new AkkaActorGateway(taskManagerRef, 
HighAvailabilityServices.DEFAULT_LEADER_ID);
 
                        // Wait until connected
                        Object msg = new 
TestingTaskManagerMessages.NotifyWhenRegisteredAtJobManager(jobManager.actor());
@@ -1026,8 +1027,8 @@ public class JobManagerTest {
                                TestingJobManager.class,
                                TestingMemoryArchivist.class);
 
-                       jobManager = new AkkaActorGateway(master._1(), null);
-                       archiver = new AkkaActorGateway(master._2(), null);
+                       jobManager = new AkkaActorGateway(master._1(), 
HighAvailabilityServices.DEFAULT_LEADER_ID);
+                       archiver = new AkkaActorGateway(master._2(), 
HighAvailabilityServices.DEFAULT_LEADER_ID);
 
                        ActorRef taskManagerRef = 
TaskManager.startTaskManagerComponentsAndActor(
                                        config,
@@ -1039,7 +1040,7 @@ public class JobManagerTest {
                                        true,
                                        TestingTaskManager.class);
 
-                       taskManager = new AkkaActorGateway(taskManagerRef, 
null);
+                       taskManager = new AkkaActorGateway(taskManagerRef, 
HighAvailabilityServices.DEFAULT_LEADER_ID);
 
                        // Wait until connected
                        Object msg = new 
TestingTaskManagerMessages.NotifyWhenRegisteredAtJobManager(jobManager.actor());
@@ -1126,8 +1127,8 @@ public class JobManagerTest {
                                TestingJobManager.class,
                                TestingMemoryArchivist.class);
 
-                       jobManager = new AkkaActorGateway(master._1(), null);
-                       archiver = new AkkaActorGateway(master._2(), null);
+                       jobManager = new AkkaActorGateway(master._1(), 
HighAvailabilityServices.DEFAULT_LEADER_ID);
+                       archiver = new AkkaActorGateway(master._2(), 
HighAvailabilityServices.DEFAULT_LEADER_ID);
 
                        Configuration tmConfig = new Configuration();
                        
tmConfig.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 4);
@@ -1142,7 +1143,7 @@ public class JobManagerTest {
                                        true,
                                        TestingTaskManager.class);
 
-                       taskManager = new AkkaActorGateway(taskManagerRef, 
null);
+                       taskManager = new AkkaActorGateway(taskManagerRef, 
HighAvailabilityServices.DEFAULT_LEADER_ID);
 
                        // Wait until connected
                        Object msg = new 
TestingTaskManagerMessages.NotifyWhenRegisteredAtJobManager(jobManager.actor());

http://git-wip-us.apache.org/repos/asf/flink/blob/2dfd463e/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
index 567a8fc..43536b6 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
@@ -64,7 +64,9 @@ public class JobMasterTest extends TestLogger {
        @Test
        public void testHeartbeatTimeoutWithTaskManager() throws Exception {
                final TestingHighAvailabilityServices haServices = new 
TestingHighAvailabilityServices();
-               final TestingLeaderRetrievalService rmLeaderRetrievalService = 
new TestingLeaderRetrievalService();
+               final TestingLeaderRetrievalService rmLeaderRetrievalService = 
new TestingLeaderRetrievalService(
+                       null,
+                       null);
                
haServices.setResourceManagerLeaderRetriever(rmLeaderRetrievalService);
                
haServices.setCheckpointRecoveryFactory(mock(CheckpointRecoveryFactory.class));
                final TestingFatalErrorHandler testingFatalErrorHandler = new 
TestingFatalErrorHandler();

http://git-wip-us.apache.org/repos/asf/flink/blob/2dfd463e/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderElectionRetrievalTestingCluster.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderElectionRetrievalTestingCluster.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderElectionRetrievalTestingCluster.java
index c143fe2..1cab0ea 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderElectionRetrievalTestingCluster.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderElectionRetrievalTestingCluster.java
@@ -53,8 +53,8 @@ public class LeaderElectionRetrievalTestingCluster extends 
TestingCluster {
                this.userConfiguration = userConfiguration;
                this.useSingleActorSystem = singleActorSystem;
 
-               leaderElectionServices = new 
ArrayList<TestingLeaderElectionService>();
-               leaderRetrievalServices = new 
ArrayList<TestingLeaderRetrievalService>();
+               leaderElectionServices = new ArrayList<>();
+               leaderRetrievalServices = new ArrayList<>();
        }
 
        @Override
@@ -78,7 +78,9 @@ public class LeaderElectionRetrievalTestingCluster extends 
TestingCluster {
 
        @Override
        public LeaderRetrievalService createLeaderRetrievalService() {
-               leaderRetrievalServices.add(new 
TestingLeaderRetrievalService());
+               leaderRetrievalServices.add(new TestingLeaderRetrievalService(
+                       null,
+                       null));
 
                return 
leaderRetrievalServices.get(leaderRetrievalServices.size() - 1);
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/2dfd463e/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/StandaloneLeaderElectionTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/StandaloneLeaderElectionTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/StandaloneLeaderElectionTest.java
index b04be63..18b620f 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/StandaloneLeaderElectionTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/StandaloneLeaderElectionTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.leaderelection;
 
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import 
org.apache.flink.runtime.leaderretrieval.StandaloneLeaderRetrievalService;
 import org.apache.flink.util.TestLogger;
 import org.junit.Test;
@@ -46,12 +47,12 @@ public class StandaloneLeaderElectionTest extends 
TestLogger {
                        contender.waitForLeader(1000l);
 
                        assertTrue(contender.isLeader());
-                       assertEquals(null, contender.getLeaderSessionID());
+                       
assertEquals(HighAvailabilityServices.DEFAULT_LEADER_ID, 
contender.getLeaderSessionID());
 
                        testingListener.waitForNewLeader(1000l);
 
                        assertEquals(TEST_URL, testingListener.getAddress());
-                       assertEquals(null, 
testingListener.getLeaderSessionID());
+                       
assertEquals(HighAvailabilityServices.DEFAULT_LEADER_ID, 
testingListener.getLeaderSessionID());
                } finally {
                        leaderElectionService.stop();
                        leaderRetrievalService.stop();

http://git-wip-us.apache.org/repos/asf/flink/blob/2dfd463e/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingLeaderRetrievalService.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingLeaderRetrievalService.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingLeaderRetrievalService.java
index d6bcaaf..887772a 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingLeaderRetrievalService.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingLeaderRetrievalService.java
@@ -35,10 +35,6 @@ public class TestingLeaderRetrievalService implements 
LeaderRetrievalService {
 
        private volatile LeaderRetrievalListener listener;
 
-       public TestingLeaderRetrievalService() {
-               this(null, null);
-       }
-
        public TestingLeaderRetrievalService(String leaderAddress, UUID 
leaderSessionID) {
                this.leaderAddress = leaderAddress;
                this.leaderSessionID = leaderSessionID;
@@ -48,7 +44,7 @@ public class TestingLeaderRetrievalService implements 
LeaderRetrievalService {
        public void start(LeaderRetrievalListener listener) throws Exception {
                this.listener = Preconditions.checkNotNull(listener);
 
-               if (leaderAddress != null) {
+               if (leaderSessionID != null && leaderAddress != null) {
                        listener.notifyLeaderAddress(leaderAddress, 
leaderSessionID);
                }
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/2dfd463e/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/TaskManagerMetricsTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/TaskManagerMetricsTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/TaskManagerMetricsTest.java
index aed2b6f..100c83d 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/TaskManagerMetricsTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/TaskManagerMetricsTest.java
@@ -25,6 +25,7 @@ import akka.testkit.JavaTestKit;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.jobmanager.JobManager;
 import org.apache.flink.runtime.jobmanager.MemoryArchivist;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
@@ -109,7 +110,11 @@ public class TaskManagerMetricsTest {
                                                
expectMsgEquals(TaskManagerMessages.getRegisteredAtJobManagerMessage());
 
                                                // trigger re-registration of 
TM; this should include a disconnect from the current JM
-                                               taskManager.tell(new 
TaskManagerMessages.JobManagerLeaderAddress(jobManager.path().toString(), 
null), jobManager);
+                                               taskManager.tell(
+                                                       new 
TaskManagerMessages.JobManagerLeaderAddress(
+                                                               
jobManager.path().toString(),
+                                                               
HighAvailabilityServices.DEFAULT_LEADER_ID),
+                                                       jobManager);
 
                                                // wait for re-registration to 
be completed
                                                
taskManager.tell(TaskManagerMessages.getNotifyWhenRegisteredAtJobManagerMessage(),

http://git-wip-us.apache.org/repos/asf/flink/blob/2dfd463e/flink-runtime/src/test/java/org/apache/flink/runtime/query/AkkaKvStateLocationLookupServiceTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/query/AkkaKvStateLocationLookupServiceTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/query/AkkaKvStateLocationLookupServiceTest.java
index e9950fb..34e3174 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/query/AkkaKvStateLocationLookupServiceTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/query/AkkaKvStateLocationLookupServiceTest.java
@@ -26,12 +26,14 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.akka.FlinkUntypedActor;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService;
 import 
org.apache.flink.runtime.query.AkkaKvStateLocationLookupService.LookupRetryStrategy;
 import 
org.apache.flink.runtime.query.AkkaKvStateLocationLookupService.LookupRetryStrategyFactory;
 import org.apache.flink.runtime.query.KvStateMessage.LookupKvStateLocation;
 import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.TestLogger;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -51,7 +53,7 @@ import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
-public class AkkaKvStateLocationLookupServiceTest {
+public class AkkaKvStateLocationLookupServiceTest extends TestLogger {
 
        /** The default timeout. */
        private static final FiniteDuration TIMEOUT = new FiniteDuration(10, 
TimeUnit.SECONDS);
@@ -77,7 +79,9 @@ public class AkkaKvStateLocationLookupServiceTest {
         */
        @Test
        public void testNoJobManagerRegistered() throws Exception {
-               TestingLeaderRetrievalService leaderRetrievalService = new 
TestingLeaderRetrievalService();
+               TestingLeaderRetrievalService leaderRetrievalService = new 
TestingLeaderRetrievalService(
+                       null,
+                       null);
                Queue<LookupKvStateLocation> received = new 
LinkedBlockingQueue<>();
 
                AkkaKvStateLocationLookupService lookupService = new 
AkkaKvStateLocationLookupService(
@@ -108,7 +112,7 @@ public class AkkaKvStateLocationLookupServiceTest {
                //
                // Leader registration => communicate with new leader
                //
-               UUID leaderSessionId = null;
+               UUID leaderSessionId = 
HighAvailabilityServices.DEFAULT_LEADER_ID;
                KvStateLocation expected = new KvStateLocation(new JobID(), new 
JobVertexID(), 8282, "tea");
 
                ActorRef testActor = LookupResponseActor.create(received, 
leaderSessionId, expected);
@@ -154,7 +158,9 @@ public class AkkaKvStateLocationLookupServiceTest {
         */
        @Test
        public void testLeaderSessionIdChange() throws Exception {
-               TestingLeaderRetrievalService leaderRetrievalService = new 
TestingLeaderRetrievalService();
+               TestingLeaderRetrievalService leaderRetrievalService = new 
TestingLeaderRetrievalService(
+                       "localhost",
+                       HighAvailabilityServices.DEFAULT_LEADER_ID);
                Queue<LookupKvStateLocation> received = new 
LinkedBlockingQueue<>();
 
                AkkaKvStateLocationLookupService lookupService = new 
AkkaKvStateLocationLookupService(
@@ -216,7 +222,9 @@ public class AkkaKvStateLocationLookupServiceTest {
                                        }
                                };
 
-               final TestingLeaderRetrievalService leaderRetrievalService = 
new TestingLeaderRetrievalService();
+               final TestingLeaderRetrievalService leaderRetrievalService = 
new TestingLeaderRetrievalService(
+                       null,
+                       null);
 
                AkkaKvStateLocationLookupService lookupService = new 
AkkaKvStateLocationLookupService(
                                leaderRetrievalService,
@@ -268,7 +276,7 @@ public class AkkaKvStateLocationLookupServiceTest {
 
                        @Override
                        public boolean tryRetry() {
-                               
leaderRetrievalService.notifyListener(testActorAddress, null);
+                               
leaderRetrievalService.notifyListener(testActorAddress, 
HighAvailabilityServices.DEFAULT_LEADER_ID);
                                return true;
                        }
                });
@@ -279,7 +287,9 @@ public class AkkaKvStateLocationLookupServiceTest {
 
        @Test
        public void testUnexpectedResponseType() throws Exception {
-               TestingLeaderRetrievalService leaderRetrievalService = new 
TestingLeaderRetrievalService();
+               TestingLeaderRetrievalService leaderRetrievalService = new 
TestingLeaderRetrievalService(
+                       "localhost",
+                       HighAvailabilityServices.DEFAULT_LEADER_ID);
                Queue<LookupKvStateLocation> received = new 
LinkedBlockingQueue<>();
 
                AkkaKvStateLocationLookupService lookupService = new 
AkkaKvStateLocationLookupService(

http://git-wip-us.apache.org/repos/asf/flink/blob/2dfd463e/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdServiceTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdServiceTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdServiceTest.java
index d5e99bd..149cc10 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdServiceTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdServiceTest.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.runtime.concurrent.ScheduledExecutor;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import 
org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
 import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService;
 import org.apache.flink.util.TestLogger;
@@ -64,7 +65,9 @@ public class JobLeaderIdServiceTest extends TestLogger {
                final String address = "foobar";
                final UUID leaderId = UUID.randomUUID();
                TestingHighAvailabilityServices highAvailabilityServices = new 
TestingHighAvailabilityServices();
-               TestingLeaderRetrievalService leaderRetrievalService = new 
TestingLeaderRetrievalService();
+               TestingLeaderRetrievalService leaderRetrievalService = new 
TestingLeaderRetrievalService(
+                       null,
+                       null);
 
                highAvailabilityServices.setJobMasterLeaderRetriever(jobId, 
leaderRetrievalService);
 
@@ -98,7 +101,7 @@ public class JobLeaderIdServiceTest extends TestLogger {
        public void testRemovingJob() throws Exception {
                final JobID jobId = new JobID();
                TestingHighAvailabilityServices highAvailabilityServices = new 
TestingHighAvailabilityServices();
-               TestingLeaderRetrievalService leaderRetrievalService = new 
TestingLeaderRetrievalService();
+               TestingLeaderRetrievalService leaderRetrievalService = new 
TestingLeaderRetrievalService(null, null);
 
                highAvailabilityServices.setJobMasterLeaderRetriever(jobId, 
leaderRetrievalService);
 
@@ -139,7 +142,9 @@ public class JobLeaderIdServiceTest extends TestLogger {
        public void testInitialJobTimeout() throws Exception {
                final JobID jobId = new JobID();
                TestingHighAvailabilityServices highAvailabilityServices = new 
TestingHighAvailabilityServices();
-               TestingLeaderRetrievalService leaderRetrievalService = new 
TestingLeaderRetrievalService();
+               TestingLeaderRetrievalService leaderRetrievalService = new 
TestingLeaderRetrievalService(
+                       null,
+                       null);
 
                highAvailabilityServices.setJobMasterLeaderRetriever(jobId, 
leaderRetrievalService);
 
@@ -181,7 +186,9 @@ public class JobLeaderIdServiceTest extends TestLogger {
                final String address = "foobar";
                final UUID leaderId = UUID.randomUUID();
                TestingHighAvailabilityServices highAvailabilityServices = new 
TestingHighAvailabilityServices();
-               TestingLeaderRetrievalService leaderRetrievalService = new 
TestingLeaderRetrievalService();
+               TestingLeaderRetrievalService leaderRetrievalService = new 
TestingLeaderRetrievalService(
+                       null,
+                       null);
 
                highAvailabilityServices.setJobMasterLeaderRetriever(jobId, 
leaderRetrievalService);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/2dfd463e/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java
index fb166d4..9a68eca 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.resourcemanager;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import 
org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
 import org.apache.flink.runtime.jobmaster.JobMasterGateway;
 import org.apache.flink.runtime.jobmaster.JobMasterRegistrationSuccess;
@@ -112,7 +113,9 @@ public class ResourceManagerJobMasterTest {
                String jobMasterAddress = "/jobMasterAddress1";
                JobID jobID = mockJobMaster(jobMasterAddress);
                TestingLeaderElectionService 
resourceManagerLeaderElectionService = new TestingLeaderElectionService();
-               TestingLeaderRetrievalService jobMasterLeaderRetrievalService = 
new TestingLeaderRetrievalService();
+               TestingLeaderRetrievalService jobMasterLeaderRetrievalService = 
new TestingLeaderRetrievalService(
+                       "localhost",
+                       HighAvailabilityServices.DEFAULT_LEADER_ID);
                TestingFatalErrorHandler testingFatalErrorHandler = new 
TestingFatalErrorHandler();
                final ResourceManager resourceManager = 
createAndStartResourceManager(resourceManagerLeaderElectionService, jobID, 
jobMasterLeaderRetrievalService, testingFatalErrorHandler);
                final UUID rmLeaderSessionId = 
grantResourceManagerLeadership(resourceManagerLeaderElectionService);
@@ -136,7 +139,9 @@ public class ResourceManagerJobMasterTest {
                String jobMasterAddress = "/jobMasterAddress1";
                JobID jobID = mockJobMaster(jobMasterAddress);
                TestingLeaderElectionService 
resourceManagerLeaderElectionService = new TestingLeaderElectionService();
-               TestingLeaderRetrievalService jobMasterLeaderRetrievalService = 
new TestingLeaderRetrievalService();
+               TestingLeaderRetrievalService jobMasterLeaderRetrievalService = 
new TestingLeaderRetrievalService(
+                       "localhost",
+                       HighAvailabilityServices.DEFAULT_LEADER_ID);
                TestingFatalErrorHandler testingFatalErrorHandler = new 
TestingFatalErrorHandler();
                final ResourceManager resourceManager = 
createAndStartResourceManager(resourceManagerLeaderElectionService, jobID, 
jobMasterLeaderRetrievalService, testingFatalErrorHandler);
                final UUID rmLeaderSessionId = 
grantResourceManagerLeadership(resourceManagerLeaderElectionService);
@@ -160,7 +165,9 @@ public class ResourceManagerJobMasterTest {
                String jobMasterAddress = "/jobMasterAddress1";
                JobID jobID = mockJobMaster(jobMasterAddress);
                TestingLeaderElectionService 
resourceManagerLeaderElectionService = new TestingLeaderElectionService();
-               TestingLeaderRetrievalService jobMasterLeaderRetrievalService = 
new TestingLeaderRetrievalService();
+               TestingLeaderRetrievalService jobMasterLeaderRetrievalService = 
new TestingLeaderRetrievalService(
+                       "localhost",
+                       HighAvailabilityServices.DEFAULT_LEADER_ID);
                TestingFatalErrorHandler testingFatalErrorHandler = new 
TestingFatalErrorHandler();
                final ResourceManager resourceManager = 
createAndStartResourceManager(resourceManagerLeaderElectionService, jobID, 
jobMasterLeaderRetrievalService, testingFatalErrorHandler);
                final UUID rmLeaderSessionId = 
grantResourceManagerLeadership(resourceManagerLeaderElectionService);

http://git-wip-us.apache.org/repos/asf/flink/blob/2dfd463e/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
index e74ba29..f6c2dce 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
@@ -83,7 +83,7 @@ public class TaskExecutorITCase {
                final ResourceID taskManagerResourceId = new 
ResourceID("foobar");
                final UUID rmLeaderId = UUID.randomUUID();
                final TestingLeaderElectionService rmLeaderElectionService = 
new TestingLeaderElectionService();
-               final TestingLeaderRetrievalService rmLeaderRetrievalService = 
new TestingLeaderRetrievalService();
+               final TestingLeaderRetrievalService rmLeaderRetrievalService = 
new TestingLeaderRetrievalService(null, null);
                final String rmAddress = "rm";
                final String jmAddress = "jm";
                final UUID jmLeaderId = UUID.randomUUID();

http://git-wip-us.apache.org/repos/asf/flink/blob/2dfd463e/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
index 5702eeb..67196aa 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
@@ -119,8 +119,12 @@ public class TaskExecutorTest extends TestLogger {
                final TestingSerialRpcService rpc = new 
TestingSerialRpcService();
                final JobLeaderService jobLeaderService = new 
JobLeaderService(taskManagerLocation);
                final TestingHighAvailabilityServices haServices = new 
TestingHighAvailabilityServices();
-               final TestingLeaderRetrievalService rmLeaderRetrievalService = 
new TestingLeaderRetrievalService();
-               final TestingLeaderRetrievalService jmLeaderRetrievalService = 
new TestingLeaderRetrievalService();
+               final TestingLeaderRetrievalService rmLeaderRetrievalService = 
new TestingLeaderRetrievalService(
+                       null,
+                       null);
+               final TestingLeaderRetrievalService jmLeaderRetrievalService = 
new TestingLeaderRetrievalService(
+                       null,
+                       null);
                haServices.setJobMasterLeaderRetriever(jobId, 
jmLeaderRetrievalService);
                
haServices.setResourceManagerLeaderRetriever(rmLeaderRetrievalService);
 
@@ -296,7 +300,9 @@ public class TaskExecutorTest extends TestLogger {
                        rpc.registerGateway(address1, rmGateway1);
                        rpc.registerGateway(address2, rmGateway2);
 
-                       TestingLeaderRetrievalService testLeaderService = new 
TestingLeaderRetrievalService();
+                       TestingLeaderRetrievalService testLeaderService = new 
TestingLeaderRetrievalService(
+                               null,
+                               null);
 
                        TestingHighAvailabilityServices haServices = new 
TestingHighAvailabilityServices();
                        
haServices.setResourceManagerLeaderRetriever(testLeaderService);
@@ -514,8 +520,12 @@ public class TaskExecutorTest extends TestLogger {
                final JobLeaderService jobLeaderService = new 
JobLeaderService(taskManagerLocation);
                final TestingFatalErrorHandler testingFatalErrorHandler = new 
TestingFatalErrorHandler();
 
-               final TestingLeaderRetrievalService 
resourceManagerLeaderRetrievalService = new TestingLeaderRetrievalService();
-               final TestingLeaderRetrievalService 
jobManagerLeaderRetrievalService = new TestingLeaderRetrievalService();
+               final TestingLeaderRetrievalService 
resourceManagerLeaderRetrievalService = new TestingLeaderRetrievalService(
+                       null,
+                       null);
+               final TestingLeaderRetrievalService 
jobManagerLeaderRetrievalService = new TestingLeaderRetrievalService(
+                       null,
+                       null);
                
haServices.setResourceManagerLeaderRetriever(resourceManagerLeaderRetrievalService);
                haServices.setJobMasterLeaderRetriever(jobId, 
jobManagerLeaderRetrievalService);
 
@@ -735,7 +745,9 @@ public class TaskExecutorTest extends TestLogger {
                        ResourceManagerGateway rmGateway1 = 
mock(ResourceManagerGateway.class);
                        rpc.registerGateway(address1, rmGateway1);
 
-                       TestingLeaderRetrievalService testLeaderService = new 
TestingLeaderRetrievalService();
+                       TestingLeaderRetrievalService testLeaderService = new 
TestingLeaderRetrievalService(
+                               "localhost",
+                               HighAvailabilityServices.DEFAULT_LEADER_ID);
 
                        TestingHighAvailabilityServices haServices = new 
TestingHighAvailabilityServices();
                        
haServices.setResourceManagerLeaderRetriever(testLeaderService);

http://git-wip-us.apache.org/repos/asf/flink/blob/2dfd463e/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java
index e234cba..f3b1d4a 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java
@@ -28,6 +28,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.clusterframework.FlinkResourceManager;
 import 
org.apache.flink.runtime.clusterframework.standalone.StandaloneResourceManager;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.instance.InstanceID;
 import org.apache.flink.runtime.jobmanager.JobManager;
@@ -567,7 +568,7 @@ public class TaskManagerRegistrationTest extends TestLogger 
{
                                final ActorRef taskManager = 
taskManagerGateway.actor();
 
                                final UUID falseLeaderSessionID = 
UUID.randomUUID();
-                               final UUID trueLeaderSessionID = null;
+                               final UUID trueLeaderSessionID = 
HighAvailabilityServices.DEFAULT_LEADER_ID;
 
                                new Within(timeout) {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/2dfd463e/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
index 730595c..a754cff 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
@@ -46,6 +46,7 @@ import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.executiongraph.JobInformation;
 import org.apache.flink.runtime.executiongraph.TaskInformation;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.instance.AkkaActorGateway;
 import org.apache.flink.runtime.instance.InstanceID;
@@ -125,7 +126,7 @@ public class TaskManagerTest extends TestLogger {
 
        private static ActorSystem system;
 
-       final static UUID leaderSessionID = null;
+       final static UUID leaderSessionID = 
HighAvailabilityServices.DEFAULT_LEADER_ID;
 
        @BeforeClass
        public static void setup() {
@@ -1165,8 +1166,8 @@ public class TaskManagerTest extends TestLogger {
 
                        // We need this to be a JM that answers to update 
messages for
                        // robustness on Travis (if jobs need to be resubmitted 
in (4)).
-                       ActorRef jm = system.actorOf(Props.create(new 
SimpleLookupJobManagerCreator(null)));
-                       ActorGateway jobManagerActorGateway = new 
AkkaActorGateway(jm, null);
+                       ActorRef jm = system.actorOf(Props.create(new 
SimpleLookupJobManagerCreator(leaderSessionID)));
+                       ActorGateway jobManagerActorGateway = new 
AkkaActorGateway(jm, leaderSessionID);
 
                        final ActorGateway testActorGateway = new 
AkkaActorGateway(
                                        getTestActor(),

http://git-wip-us.apache.org/repos/asf/flink/blob/2dfd463e/flink-runtime/src/test/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/resources/log4j-test.properties 
b/flink-runtime/src/test/resources/log4j-test.properties
index 7ba1633..98f136a 100644
--- a/flink-runtime/src/test/resources/log4j-test.properties
+++ b/flink-runtime/src/test/resources/log4j-test.properties
@@ -16,7 +16,7 @@
 # limitations under the License.
 
################################################################################
 
-log4j.rootLogger=OFF, console
+log4j.rootLogger=INFO, console
 
 # -----------------------------------------------------------------------------
 # Console (use 'console')

http://git-wip-us.apache.org/repos/asf/flink/blob/2dfd463e/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerRegistrationTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerRegistrationTest.scala
 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerRegistrationTest.scala
index dfcbf77..76d9591 100644
--- 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerRegistrationTest.scala
+++ 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerRegistrationTest.scala
@@ -27,12 +27,12 @@ import org.apache.flink.configuration.Configuration
 import org.apache.flink.runtime.akka.AkkaUtils
 import org.apache.flink.runtime.clusterframework.FlinkResourceManager
 import org.apache.flink.runtime.clusterframework.types.ResourceID
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices
 import org.apache.flink.runtime.instance._
 import 
org.apache.flink.runtime.jobmanager.JobManagerRegistrationTest.PlainForwardingActor
 import 
org.apache.flink.runtime.messages.JobManagerMessages.LeaderSessionMessage
 import 
org.apache.flink.runtime.messages.RegistrationMessages.{AcknowledgeRegistration,
 AlreadyRegistered, RegisterTaskManager}
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation
-
 import org.apache.flink.runtime.testutils.TestingResourceManager
 import org.apache.flink.runtime.util.LeaderRetrievalUtils
 import org.junit.Assert.{assertNotEquals, assertNotNull}
@@ -87,7 +87,7 @@ ImplicitSender with WordSpecLike with Matchers with 
BeforeAndAfterAll {
             connectionInfo1,
             hardwareDescription,
             1),
-          new AkkaActorGateway(tm1, null))
+          new AkkaActorGateway(tm1, 
HighAvailabilityServices.DEFAULT_LEADER_ID))
 
         val response = expectMsgType[LeaderSessionMessage]
         response match {
@@ -104,7 +104,7 @@ ImplicitSender with WordSpecLike with Matchers with 
BeforeAndAfterAll {
             connectionInfo2,
             hardwareDescription,
             1),
-          new AkkaActorGateway(tm2, null))
+          new AkkaActorGateway(tm2, 
HighAvailabilityServices.DEFAULT_LEADER_ID))
 
         val response = expectMsgType[LeaderSessionMessage]
         response match {
@@ -123,7 +123,7 @@ ImplicitSender with WordSpecLike with Matchers with 
BeforeAndAfterAll {
       val jm = startTestingJobManager(_system)
       val rm = startTestingResourceManager(_system, jm.actor())
 
-      val selfGateway = new AkkaActorGateway(testActor, null)
+      val selfGateway = new AkkaActorGateway(testActor, 
HighAvailabilityServices.DEFAULT_LEADER_ID)
 
       val resourceID = ResourceID.generate()
       val connectionInfo = new TaskManagerLocation(resourceID, 
InetAddress.getLocalHost, 1)
@@ -155,17 +155,23 @@ ImplicitSender with WordSpecLike with Matchers with 
BeforeAndAfterAll {
           selfGateway)
 
         expectMsgType[LeaderSessionMessage] match {
-          case LeaderSessionMessage(null, AcknowledgeRegistration(_, _)) =>
+          case LeaderSessionMessage(
+            HighAvailabilityServices.DEFAULT_LEADER_ID,
+            AcknowledgeRegistration(_, _)) =>
           case m => fail("Wrong message type: " + m)
         }
 
         expectMsgType[LeaderSessionMessage] match {
-          case LeaderSessionMessage(null, AlreadyRegistered(_, _)) =>
+          case LeaderSessionMessage(
+            HighAvailabilityServices.DEFAULT_LEADER_ID,
+            AlreadyRegistered(_, _)) =>
           case m => fail("Wrong message type: " + m)
         }
 
         expectMsgType[LeaderSessionMessage] match {
-          case LeaderSessionMessage(null, AlreadyRegistered(_, _)) =>
+          case LeaderSessionMessage(
+            HighAvailabilityServices.DEFAULT_LEADER_ID,
+            AlreadyRegistered(_, _)) =>
           case m => fail("Wrong message type: " + m)
         }
       }
@@ -182,7 +188,7 @@ ImplicitSender with WordSpecLike with Matchers with 
BeforeAndAfterAll {
       None,
       classOf[JobManager],
       classOf[MemoryArchivist])
-    new AkkaActorGateway(jm, null)
+    new AkkaActorGateway(jm, HighAvailabilityServices.DEFAULT_LEADER_ID)
   }
 
   private def startTestingResourceManager(system: ActorSystem, jm: ActorRef): 
ActorGateway = {
@@ -193,7 +199,7 @@ ImplicitSender with WordSpecLike with Matchers with 
BeforeAndAfterAll {
       _system,
       LeaderRetrievalUtils.createLeaderRetrievalService(config, jm),
       classOf[TestingResourceManager])
-    new AkkaActorGateway(rm, null)
+    new AkkaActorGateway(rm, HighAvailabilityServices.DEFAULT_LEADER_ID)
   }
 }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/2dfd463e/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/ScalaTestingUtils.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/ScalaTestingUtils.scala
 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/ScalaTestingUtils.scala
index d46dd71..d4ca8f6 100644
--- 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/ScalaTestingUtils.scala
+++ 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/ScalaTestingUtils.scala
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.testingUtils
 
 import akka.actor.ActorRef
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices
 import org.apache.flink.runtime.instance.{ActorGateway, AkkaActorGateway}
 
 /** Mixing for testing utils
@@ -32,6 +33,6 @@ trait ScalaTestingUtils {
     * @return [[ActorGateway]] encapsulating the given [[ActorRef]]
     */
   implicit def actorRef2InstanceGateway(actor: ActorRef): ActorGateway = {
-    new AkkaActorGateway(actor, null)
+    new AkkaActorGateway(actor, HighAvailabilityServices.DEFAULT_LEADER_ID)
   }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/2dfd463e/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
index 20260c7..d6221f5 100644
--- 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
+++ 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
@@ -33,6 +33,7 @@ import org.apache.flink.runtime.akka.AkkaUtils
 import org.apache.flink.runtime.client.JobClient
 import org.apache.flink.runtime.clusterframework.FlinkResourceManager
 import org.apache.flink.runtime.clusterframework.types.ResourceID
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices
 import org.apache.flink.runtime.instance.{ActorGateway, AkkaActorGateway}
 import org.apache.flink.runtime.jobgraph.JobGraph
 import org.apache.flink.runtime.jobmanager.{JobManager, MemoryArchivist}
@@ -45,7 +46,7 @@ import org.apache.flink.runtime.{FlinkActor, 
LeaderSessionMessageFilter, LogMess
 
 import scala.concurrent.duration.TimeUnit
 import scala.concurrent.duration._
-import scala.concurrent.{ExecutionContextExecutor, Await, ExecutionContext}
+import scala.concurrent.{Await, ExecutionContext, ExecutionContextExecutor}
 import scala.language.postfixOps
 
 /**
@@ -326,7 +327,7 @@ object TestingUtils {
       Await.ready(notificationResult, TESTING_DURATION)
     }
 
-    new AkkaActorGateway(taskManager, null)
+    new AkkaActorGateway(taskManager, 
HighAvailabilityServices.DEFAULT_LEADER_ID)
   }
 
   /** Stops the given actor by sending it a Kill message
@@ -456,7 +457,7 @@ object TestingUtils {
         jobManagerClass,
         classOf[MemoryArchivist])
 
-    new AkkaActorGateway(actor, null)
+    new AkkaActorGateway(actor, HighAvailabilityServices.DEFAULT_LEADER_ID)
   }
 
   /** Creates a forwarding JobManager which sends all received message to the 
forwarding target.
@@ -478,7 +479,7 @@ object TestingUtils {
           Props(
             classOf[ForwardingActor],
             forwardingTarget,
-            None),
+            Option(HighAvailabilityServices.DEFAULT_LEADER_ID)),
           name
         )
       case None =>
@@ -486,11 +487,11 @@ object TestingUtils {
           Props(
             classOf[ForwardingActor],
             forwardingTarget,
-            None)
+            Option(HighAvailabilityServices.DEFAULT_LEADER_ID))
         )
     }
 
-    new AkkaActorGateway(actor, null)
+    new AkkaActorGateway(actor, HighAvailabilityServices.DEFAULT_LEADER_ID)
   }
 
   def submitJobAndWait(
@@ -537,7 +538,7 @@ object TestingUtils {
       LeaderRetrievalUtils.createLeaderRetrievalService(configuration, 
jobManager),
       classOf[TestingResourceManager])
 
-    new AkkaActorGateway(actor, null)
+    new AkkaActorGateway(actor, HighAvailabilityServices.DEFAULT_LEADER_ID)
   }
 
 

Reply via email to