[FLINK-4851] [rm] Introduce FatalErrorHandler and MetricRegistry to RM

This PR introduces a FatalErrorHandler and the MetricRegistry to the RM. The 
FatalErrorHandler is used to handle fatal errors. Additionally, the PR adds the 
MetricRegistry to the RM which can be used
to register metrics.

Apart from these changes the PR restructures the code of the RM a little bit 
and fixes some
blocking operations.

The PR also moves the TestingFatalErrorHandler into the util package of 
flink-runtime test. That
it is usable across multiple tests.

Introduce ResourceManagerRunner to handle errors in the ResourceManager

This closes #2655.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f38bf448
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f38bf448
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f38bf448

Branch: refs/heads/flip-6
Commit: f38bf4484df3e022f389357059ca984c5e3f76a6
Parents: f63f972
Author: Till Rohrmann <trohrm...@apache.org>
Authored: Mon Oct 17 16:03:02 2016 +0200
Committer: Till Rohrmann <trohrm...@apache.org>
Committed: Sun Oct 23 11:11:52 2016 +0200

----------------------------------------------------------------------
 .../flink/runtime/minicluster/MiniCluster.java  |  35 +-
 .../resourcemanager/ResourceManager.java        | 463 +++++++++++--------
 .../ResourceManagerConfiguration.java           |  86 ++++
 .../StandaloneResourceManager.java              |  25 +-
 .../exceptions/ConfigurationException.java      |  38 ++
 .../exceptions/ResourceManagerException.java    |  40 ++
 .../exceptions/ResourceManagerRunner.java       | 102 ++++
 .../registration/WorkerRegistration.java        |   5 +-
 .../resourcemanager/ResourceManagerHATest.java  |  45 +-
 .../ResourceManagerJobMasterTest.java           |  59 ++-
 .../ResourceManagerTaskExecutorTest.java        |  87 ++--
 .../resourcemanager/TestingResourceManager.java |  53 ---
 .../TestingSlotManagerFactory.java              |  30 ++
 .../slotmanager/SlotProtocolTest.java           |  47 +-
 .../runtime/taskexecutor/TaskExecutorTest.java  |  60 +--
 .../runtime/util/TestingFatalErrorHandler.java  |  83 ++++
 16 files changed, 863 insertions(+), 395 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/f38bf448/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
index d63f9a7..b005330 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
@@ -32,10 +32,7 @@ import 
org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.metrics.MetricRegistry;
 import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
-import org.apache.flink.runtime.resourcemanager.ResourceManager;
-import org.apache.flink.runtime.resourcemanager.StandaloneResourceManager;
-import org.apache.flink.runtime.resourcemanager.slotmanager.DefaultSlotManager;
-import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerFactory;
+import 
org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerRunner;
 import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
 import org.apache.flink.runtime.taskexecutor.TaskManagerRunner;
@@ -52,7 +49,6 @@ import static 
org.apache.flink.util.ExceptionUtils.firstOrSuppressed;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 import static org.apache.flink.util.Preconditions.checkState;
 
-
 public class MiniCluster {
 
        private static final Logger LOG = 
LoggerFactory.getLogger(MiniCluster.class);
@@ -82,7 +78,7 @@ public class MiniCluster {
        private HighAvailabilityServices haServices;
 
        @GuardedBy("lock")
-       private ResourceManager<?>[] resourceManagers;
+       private ResourceManagerRunner[] resourceManagerRunners;
 
        @GuardedBy("lock")
        private TaskManagerRunner[] taskManagerRunners;
@@ -231,7 +227,7 @@ public class MiniCluster {
 
                                // bring up the ResourceManager(s)
                                LOG.info("Starting {} ResourceManger(s)", 
numResourceManagers);
-                               resourceManagers = startResourceManagers(
+                               resourceManagerRunners = startResourceManagers(
                                                configuration, haServices, 
metricRegistry, numResourceManagers, resourceManagerRpcServices);
 
                                // bring up the TaskManager(s) for the mini 
cluster
@@ -303,8 +299,8 @@ public class MiniCluster {
                        jobDispatcher = null;
                }
 
-               if (resourceManagers != null) {
-                       for (ResourceManager<?> rm : resourceManagers) {
+               if (resourceManagerRunners != null) {
+                       for (ResourceManagerRunner rm : resourceManagerRunners) 
{
                                if (rm != null) {
                                        try {
                                                rm.shutDown();
@@ -313,7 +309,7 @@ public class MiniCluster {
                                        }
                                }
                        }
-                       resourceManagers = null;
+                       resourceManagerRunners = null;
                }
 
                // shut down the RpcServices
@@ -435,26 +431,27 @@ public class MiniCluster {
                return new AkkaRpcService(actorSystem, askTimeout);
        }
 
-       protected ResourceManager<?>[] startResourceManagers(
+       protected ResourceManagerRunner[] startResourceManagers(
                        Configuration configuration,
                        HighAvailabilityServices haServices,
                        MetricRegistry metricRegistry,
                        int numResourceManagers,
                        RpcService[] resourceManagerRpcServices) throws 
Exception {
 
-               final StandaloneResourceManager[] resourceManagers = new 
StandaloneResourceManager[numResourceManagers];
-               final SlotManagerFactory slotManagerFactory = new 
DefaultSlotManager.Factory(); 
+               final ResourceManagerRunner[] resourceManagerRunners = new 
ResourceManagerRunner[numResourceManagers];
 
                for (int i = 0; i < numResourceManagers; i++) {
-                       resourceManagers[i] = new StandaloneResourceManager(
-                                       resourceManagerRpcServices[i],
-                                       haServices,
-                                       slotManagerFactory);
 
-                       resourceManagers[i].start();
+                       resourceManagerRunners[i] = new ResourceManagerRunner(
+                               configuration,
+                               resourceManagerRpcServices[i],
+                               haServices,
+                               metricRegistry);
+
+                       resourceManagerRunners[i].start();
                }
 
-               return resourceManagers;
+               return resourceManagerRunners;
        }
 
        protected TaskManagerRunner[] startTaskManagers(

http://git-wip-us.apache.org/repos/asf/flink/blob/f38bf448/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
index f1a5073..4161972 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
@@ -38,12 +38,15 @@ import 
org.apache.flink.runtime.leaderelection.LeaderContender;
 import org.apache.flink.runtime.leaderelection.LeaderElectionService;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import 
org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException;
 import 
org.apache.flink.runtime.resourcemanager.messages.jobmanager.RMSlotRequestRejected;
 import 
org.apache.flink.runtime.resourcemanager.messages.jobmanager.RMSlotRequestReply;
 import 
org.apache.flink.runtime.resourcemanager.registration.JobMasterRegistration;
 import 
org.apache.flink.runtime.resourcemanager.registration.WorkerRegistration;
 import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
 import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerFactory;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcEndpoint;
 import org.apache.flink.runtime.rpc.RpcMethod;
 import org.apache.flink.runtime.rpc.RpcService;
@@ -54,6 +57,7 @@ import 
org.apache.flink.runtime.registration.RegistrationResponse;
 import org.apache.flink.runtime.taskexecutor.SlotReport;
 import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
 import org.apache.flink.runtime.taskexecutor.TaskExecutorRegistrationSuccess;
+import org.apache.flink.util.ExceptionUtils;
 
 import java.io.Serializable;
 import java.util.HashMap;
@@ -61,6 +65,8 @@ import java.util.Iterator;
 import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.Executor;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -79,8 +85,8 @@ public abstract class ResourceManager<WorkerType extends 
Serializable>
                extends RpcEndpoint<ResourceManagerGateway>
                implements LeaderContender {
 
-       /** The exit code with which the process is stopped in case of a fatal 
error. */
-       protected static final int EXIT_CODE_FATAL_ERROR = -13;
+       /** Configuration of the resource manager */
+       private final ResourceManagerConfiguration resourceManagerConfiguration;
 
        /** All currently registered JobMasterGateways scoped by JobID. */
        private final Map<JobID, JobMasterRegistration> jobMasters;
@@ -97,6 +103,12 @@ public abstract class ResourceManager<WorkerType extends 
Serializable>
        /** The factory to construct the SlotManager. */
        private final SlotManagerFactory slotManagerFactory;
 
+       /** Registry to use for metrics */
+       private final MetricRegistry metricRegistry;
+
+       /** Fatal error handler */
+       private final FatalErrorHandler fatalErrorHandler;
+
        /** The SlotManager created by the slotManagerFactory when the 
ResourceManager is started. */
        private SlotManager slotManager;
 
@@ -107,64 +119,89 @@ public abstract class ResourceManager<WorkerType extends 
Serializable>
        private volatile UUID leaderSessionID;
 
        /** All registered listeners for status updates of the ResourceManager. 
*/
-       private Map<String, InfoMessageListenerRpcGateway> infoMessageListeners;
-
-       /** Default timeout for messages */
-       private final Time timeout = Time.seconds(5);
+       private ConcurrentMap<String, InfoMessageListenerRpcGateway> 
infoMessageListeners;
 
        public ResourceManager(
                        RpcService rpcService,
+                       ResourceManagerConfiguration 
resourceManagerConfiguration,
                        HighAvailabilityServices highAvailabilityServices,
-                       SlotManagerFactory slotManagerFactory) {
+                       SlotManagerFactory slotManagerFactory,
+                       MetricRegistry metricRegistry,
+                       FatalErrorHandler fatalErrorHandler) {
+
                super(rpcService);
+
+               this.resourceManagerConfiguration = 
checkNotNull(resourceManagerConfiguration);
                this.highAvailabilityServices = 
checkNotNull(highAvailabilityServices);
                this.slotManagerFactory = checkNotNull(slotManagerFactory);
-               this.jobMasters = new HashMap<>();
-               this.leaderListeners = new HashMap<>();
-               this.taskExecutors = new HashMap<>();
-               this.leaderSessionID = new UUID(0, 0);
-               infoMessageListeners = new HashMap<>();
+               this.metricRegistry = checkNotNull(metricRegistry);
+               this.fatalErrorHandler = checkNotNull(fatalErrorHandler);
+
+               this.jobMasters = new HashMap<>(4);
+               this.leaderListeners = new HashMap<>(4);
+               this.taskExecutors = new HashMap<>(8);
+               this.leaderSessionID = null;
+               infoMessageListeners = new ConcurrentHashMap<>(8);
        }
 
+       // 
------------------------------------------------------------------------
+       //  RPC lifecycle methods
+       // 
------------------------------------------------------------------------
+
        @Override
-       public void start() {
+       public void start() throws Exception {
                // start a leader
+               super.start();
+
                try {
-                       super.start();
                        // SlotManager should start first
                        slotManager = 
slotManagerFactory.create(createResourceManagerServices());
+               } catch (Exception e) {
+                       throw new ResourceManagerException("Could not create 
the slot manager.", e);
+               }
+
+               try {
                        leaderElectionService = 
highAvailabilityServices.getResourceManagerLeaderElectionService();
                        leaderElectionService.start(this);
+               } catch (Exception e) {
+                       throw new ResourceManagerException("Could not start the 
leader election service.", e);
+               }
+
+               try {
                        // framework specific initialization
                        initialize();
-               } catch (Throwable e) {
-                       log.error("A fatal error happened when starting the 
ResourceManager", e);
-                       throw new RuntimeException("A fatal error happened when 
starting the ResourceManager", e);
+               } catch (Exception e) {
+                       throw new ResourceManagerException("Could not 
initialize the resource manager.", e);
                }
        }
 
        @Override
-       public void shutDown() {
+       public void shutDown() throws Exception {
+               Exception exception = null;
+
                try {
                        leaderElectionService.stop();
-                       clearState();
+               } catch (Exception e) {
+                       exception = ExceptionUtils.firstOrSuppressed(e, 
exception);
+               }
+
+               clearState();
+
+               try {
                        super.shutDown();
-               } catch (Throwable e) {
-                       log.error("A fatal error happened when shutdown the 
ResourceManager", e);
-                       throw new RuntimeException("A fatal error happened when 
shutdown the ResourceManager", e);
+               } catch (Exception e) {
+                       exception = ExceptionUtils.firstOrSuppressed(e, 
exception);
                }
-       }
 
-       /**
-        * Gets the leader session id of current resourceManager.
-        *
-        * @return return the leaderSessionId of current resourceManager, this 
returns null until the current resourceManager is granted leadership.
-        */
-       @VisibleForTesting
-       UUID getLeaderSessionID() {
-               return this.leaderSessionID;
+               if (exception != null) {
+                       ExceptionUtils.rethrowException(exception, "Error while 
shutting the ResourceManager down.");
+               }
        }
 
+       // 
------------------------------------------------------------------------
+       //  RPC methods
+       // 
------------------------------------------------------------------------
+
        /**
         * Register a {@link JobMaster} at the resource manager.
         *
@@ -191,11 +228,12 @@ public abstract class ResourceManager<WorkerType extends 
Serializable>
                                        
highAvailabilityServices.getJobManagerLeaderRetriever(jobID);
                                jobIdLeaderListener = new 
JobIdLeaderListener(jobID, jobMasterLeaderRetriever);
                        } catch (Exception e) {
-                               log.warn("Failed to start 
JobMasterLeaderRetriever for JobID {}", jobID);
-                               FlinkCompletableFuture<RegistrationResponse> 
responseFuture = new FlinkCompletableFuture<>();
-                               responseFuture.complete(new 
RegistrationResponse.Decline("Failed to retrieve JobMasterLeaderRetriever"));
-                               return responseFuture;
+                               log.warn("Failed to start 
JobMasterLeaderRetriever for job id {}", jobID, e);
+
+                               return 
FlinkCompletableFuture.<RegistrationResponse>completed(
+                                       new 
RegistrationResponse.Decline("Failed to retrieve JobMasterLeaderRetriever"));
                        }
+
                        leaderListeners.put(jobID, jobIdLeaderListener);
                }
 
@@ -211,6 +249,8 @@ public abstract class ResourceManager<WorkerType extends 
Serializable>
                                                throw new Exception("Invalid 
leader session id");
                                        }
 
+                                       final Time timeout = 
resourceManagerConfiguration.getTimeout();
+
                                        if 
(!jobIdLeaderListener.getLeaderID().get(timeout.getSize(), timeout.getUnit())
                                                        
.equals(jobMasterLeaderId)) {
                                                throw new Exception("Leader Id 
did not match");
@@ -224,10 +264,9 @@ public abstract class ResourceManager<WorkerType extends 
Serializable>
                                @Override
                                public RegistrationResponse 
apply(JobMasterGateway jobMasterGateway, Throwable throwable) {
 
-                                       if (throwable != null) {
-                                               return new 
RegistrationResponse.Decline(throwable.getMessage());
-                                       }
-
+                               if (throwable != null) {
+                                       return new 
RegistrationResponse.Decline(throwable.getMessage());
+                               } else {
                                        if 
(!leaderSessionID.equals(resourceManagerLeaderId)) {
                                                log.warn("Discard registration 
from JobMaster {} at ({}) because the expected leader session ID {}" +
                                                                " did not equal 
the received leader session ID  {}",
@@ -252,10 +291,12 @@ public abstract class ResourceManager<WorkerType extends 
Serializable>
                                        if (existingRegistration != null) {
                                                log.info("Replacing JobMaster 
registration for newly registered JobMaster with JobID {}.", jobID);
                                        }
-                                       return new 
JobMasterRegistrationSuccess(5000, resourceManagerLeaderId);
-
+                                       return new JobMasterRegistrationSuccess(
+                                               
resourceManagerConfiguration.getHeartbeatInterval().toMilliseconds(),
+                                               resourceManagerLeaderId);
                                }
-                       }, getMainThreadExecutor());
+                       }
+               }, getMainThreadExecutor());
        }
 
        /**
@@ -274,38 +315,44 @@ public abstract class ResourceManager<WorkerType extends 
Serializable>
                final ResourceID resourceID,
                final SlotReport slotReport) {
 
-               return getRpcService().execute(new 
Callable<TaskExecutorGateway>() {
-                       @Override
-                       public TaskExecutorGateway call() throws Exception {
-                               if 
(!leaderSessionID.equals(resourceManagerLeaderId)) {
-                                       log.warn("Discard registration from 
TaskExecutor {} at ({}) because the expected leader session ID {} did " +
-                                                       "not equal the received 
leader session ID  {}",
-                                               resourceID, 
taskExecutorAddress, leaderSessionID, resourceManagerLeaderId);
-                                       throw new Exception("Invalid leader 
session id");
-                               }
-                               return 
getRpcService().connect(taskExecutorAddress, TaskExecutorGateway.class)
-                                       .get(timeout.toMilliseconds(), 
timeout.getUnit());
-                       }
-               }).handleAsync(new BiFunction<TaskExecutorGateway, Throwable, 
RegistrationResponse>() {
-                       @Override
-                       public RegistrationResponse apply(TaskExecutorGateway 
taskExecutorGateway, Throwable throwable) {
-                               if (throwable != null) {
-                                       return new 
RegistrationResponse.Decline(throwable.getMessage());
-                               } else {
-                                       WorkerRegistration oldRegistration = 
taskExecutors.remove(resourceID);
-                                       if (oldRegistration != null) {
-                                               // TODO :: suggest old 
taskExecutor to stop itself
-                                               log.info("Replacing old 
instance of worker for ResourceID {}", resourceID);
+               if (leaderSessionID.equals(resourceManagerLeaderId)) {
+                       Future<TaskExecutorGateway> taskExecutorGatewayFuture = 
getRpcService().connect(taskExecutorAddress, TaskExecutorGateway.class);
+
+                       return taskExecutorGatewayFuture.handleAsync(new 
BiFunction<TaskExecutorGateway, Throwable, RegistrationResponse>() {
+                               @Override
+                               public RegistrationResponse 
apply(TaskExecutorGateway taskExecutorGateway, Throwable throwable) {
+                                       if (throwable != null) {
+                                               return new 
RegistrationResponse.Decline(throwable.getMessage());
+                                       } else {
+                                               WorkerRegistration<WorkerType> 
oldRegistration = taskExecutors.remove(resourceID);
+                                               if (oldRegistration != null) {
+                                                       // TODO :: suggest old 
taskExecutor to stop itself
+                                                       log.info("Replacing old 
instance of worker for ResourceID {}", resourceID);
+                                               }
+
+                                               WorkerType newWorker = 
workerStarted(resourceID);
+                                               WorkerRegistration<WorkerType> 
registration =
+                                                       new 
WorkerRegistration<>(taskExecutorGateway, newWorker);
+
+                                               taskExecutors.put(resourceID, 
registration);
+                                               
slotManager.registerTaskExecutor(resourceID, registration, slotReport);
+
+                                               return new 
TaskExecutorRegistrationSuccess(
+                                                       
registration.getInstanceID(),
+                                                       
resourceManagerConfiguration.getHeartbeatInterval().toMilliseconds());
                                        }
-                                       WorkerType newWorker = 
workerStarted(resourceID);
-                                       WorkerRegistration<WorkerType> 
registration =
-                                               new 
WorkerRegistration<>(taskExecutorGateway, newWorker);
-                                       taskExecutors.put(resourceID, 
registration);
-                                       
slotManager.registerTaskExecutor(resourceID, registration, slotReport);
-                                       return new 
TaskExecutorRegistrationSuccess(registration.getInstanceID(), 5000);
                                }
-                       }
-               }, getMainThreadExecutor());
+                       }, getMainThreadExecutor());
+               } else {
+                       log.warn("Discard registration from TaskExecutor {} at 
({}) because the expected leader session ID {} did " +
+                                       "not equal the received leader session 
ID  {}",
+                               resourceID, taskExecutorAddress, 
leaderSessionID, resourceManagerLeaderId);
+
+                       return 
FlinkCompletableFuture.<RegistrationResponse>completed(
+                               new RegistrationResponse.Decline("Discard 
registration because the leader id " +
+                                       resourceManagerLeaderId + " does not 
match the expected leader id " +
+                                       leaderSessionID + '.'));
+               }
        }
 
        /**
@@ -337,33 +384,91 @@ public abstract class ResourceManager<WorkerType extends 
Serializable>
         * Notification from a TaskExecutor that a slot has become available
         * @param resourceManagerLeaderId TaskExecutor's resource manager 
leader id
         * @param instanceID TaskExecutor's instance id
-        * @param slotID The slot id of the available slot
+        * @param slotId The slot id of the available slot
         * @return SlotAvailableReply
         */
        @RpcMethod
        public void notifySlotAvailable(
                        final UUID resourceManagerLeaderId,
                        final InstanceID instanceID,
-                       final SlotID slotID) {
+                       final SlotID slotId) {
 
                if (resourceManagerLeaderId.equals(leaderSessionID)) {
-                       final ResourceID resourceId = slotID.getResourceID();
+                       final ResourceID resourceId = slotId.getResourceID();
                        WorkerRegistration<WorkerType> registration = 
taskExecutors.get(resourceId);
 
                        if (registration != null) {
-                               InstanceID registrationInstanceID = 
registration.getInstanceID();
-                               if (registrationInstanceID.equals(instanceID)) {
-                                       runAsync(new Runnable() {
-                                               @Override
-                                               public void run() {
-                                                       
slotManager.notifySlotAvailable(resourceId, slotID);
-                                               }
-                                       });
+                               InstanceID registrationId = 
registration.getInstanceID();
+
+                               if (registrationId.equals(instanceID)) {
+                                       
slotManager.notifySlotAvailable(resourceId, slotId);
+                               } else {
+                                       log.debug("Invalid registration id for 
slot available message. This indicates an" +
+                                               " outdated request.");
                                }
+                       } else {
+                               log.debug("Could not find registration for 
resource id {}. Discarding the slot available" +
+                                       "message {}.", resourceId, slotId);
                        }
+               } else {
+                       log.debug("Discarding notify slot available message for 
slot {}, because the " +
+                               "leader id {} did not match the expected leader 
id {}.", slotId,
+                               resourceManagerLeaderId, leaderSessionID);
+               }
+       }
+
+       /**
+        * Registers an info message listener
+        *
+        * @param address address of infoMessage listener to register to this 
resource manager
+        */
+       @RpcMethod
+       public void registerInfoMessageListener(final String address) {
+               if(infoMessageListeners.containsKey(address)) {
+                       log.warn("Receive a duplicate registration from info 
message listener on ({})", address);
+               } else {
+                       Future<InfoMessageListenerRpcGateway> 
infoMessageListenerRpcGatewayFuture = getRpcService().connect(address, 
InfoMessageListenerRpcGateway.class);
+
+                       infoMessageListenerRpcGatewayFuture.thenAcceptAsync(new 
AcceptFunction<InfoMessageListenerRpcGateway>() {
+                               @Override
+                               public void 
accept(InfoMessageListenerRpcGateway gateway) {
+                                       log.info("Receive a registration from 
info message listener on ({})", address);
+                                       infoMessageListeners.put(address, 
gateway);
+                               }
+                       }, getMainThreadExecutor());
+
+                       
infoMessageListenerRpcGatewayFuture.exceptionallyAsync(new 
ApplyFunction<Throwable, Void>() {
+                               @Override
+                               public Void apply(Throwable failure) {
+                                       log.warn("Receive a registration from 
unreachable info message listener on ({})", address);
+                                       return null;
+                               }
+                       }, getRpcService().getExecutor());
                }
        }
 
+       /**
+        * Unregisters an info message listener
+        *
+        * @param address of the  info message listener to unregister from this 
resource manager
+        *
+        */
+       @RpcMethod
+       public void unRegisterInfoMessageListener(final String address) {
+               infoMessageListeners.remove(address);
+       }
+
+       /**
+        * Cleanup application and shut down cluster
+        *
+        * @param finalStatus
+        * @param optionalDiagnostics
+        */
+       @RpcMethod
+       public void shutDownCluster(final ApplicationStatus finalStatus, final 
String optionalDiagnostics) {
+               log.info("shut down cluster because application is in {}, 
diagnostics {}", finalStatus, optionalDiagnostics);
+               shutDownApplication(finalStatus, optionalDiagnostics);
+       }
 
        // 
------------------------------------------------------------------------
        //  Leader Contender
@@ -372,23 +477,35 @@ public abstract class ResourceManager<WorkerType extends 
Serializable>
        /**
         * Callback method when current resourceManager is granted leadership
         *
-        * @param leaderSessionID unique leadershipID
+        * @param newLeaderSessionID unique leadershipID
         */
        @Override
-       public void grantLeadership(final UUID leaderSessionID) {
+       public void grantLeadership(final UUID newLeaderSessionID) {
                runAsync(new Runnable() {
                        @Override
                        public void run() {
-                               log.info("ResourceManager {} was granted 
leadership with leader session ID {}", getAddress(), leaderSessionID);
-                               // confirming the leader session ID might be 
blocking,
-                               
leaderElectionService.confirmLeaderSessionID(leaderSessionID);
-                               ResourceManager.this.leaderSessionID = 
leaderSessionID;
+                               log.info("ResourceManager {} was granted 
leadership with leader session ID {}", getAddress(), newLeaderSessionID);
+
+                               // clear the state if we've been the leader 
before
+                               if (leaderSessionID != null) {
+                                       clearState();
+                               }
+
+                               leaderSessionID = newLeaderSessionID;
+
+                               getRpcService().execute(new Runnable() {
+                                       @Override
+                                       public void run() {
+                                               // confirming the leader 
session ID might be blocking,
+                                               
leaderElectionService.confirmLeaderSessionID(newLeaderSessionID);
+                                       }
+                               });
                        }
                });
        }
 
        /**
-        * Callback method when current resourceManager lose leadership.
+        * Callback method when current resourceManager loses leadership.
         */
        @Override
        public void revokeLeadership() {
@@ -396,7 +513,10 @@ public abstract class ResourceManager<WorkerType extends 
Serializable>
                        @Override
                        public void run() {
                                log.info("ResourceManager {} was revoked 
leadership.", getAddress());
+
                                clearState();
+
+                               leaderSessionID = null;
                        }
                });
        }
@@ -408,106 +528,98 @@ public abstract class ResourceManager<WorkerType extends 
Serializable>
         */
        @Override
        public void handleError(final Exception exception) {
-               log.error("ResourceManager received an error from the 
LeaderElectionService.", exception);
-               // terminate ResourceManager in case of an error
-               shutDown();
+               onFatalErrorAsync(new ResourceManagerException("Received an 
error from the LeaderElectionService.", exception));
        }
 
        /**
-        * Registers an infoMessage listener
+        * This method should be called by the framework once it detects that a 
currently registered
+        * task executor has failed.
         *
-        * @param infoMessageListenerAddress address of infoMessage listener to 
register to this resource manager
+        * @param resourceID Id of the worker that has failed.
+        * @param message An informational message that explains why the worker 
failed.
         */
-       @RpcMethod
-       public void registerInfoMessageListener(final String 
infoMessageListenerAddress) {
-               
if(infoMessageListeners.containsKey(infoMessageListenerAddress)) {
-                       log.warn("Receive a duplicate registration from info 
message listener on ({})", infoMessageListenerAddress);
-               } else {
-                       Future<InfoMessageListenerRpcGateway> 
infoMessageListenerRpcGatewayFuture = 
getRpcService().connect(infoMessageListenerAddress, 
InfoMessageListenerRpcGateway.class);
-
-                       infoMessageListenerRpcGatewayFuture.thenAcceptAsync(new 
AcceptFunction<InfoMessageListenerRpcGateway>() {
-                               @Override
-                               public void 
accept(InfoMessageListenerRpcGateway gateway) {
-                                       log.info("Receive a registration from 
info message listener on ({})", infoMessageListenerAddress);
-                                       
infoMessageListeners.put(infoMessageListenerAddress, gateway);
-                               }
-                       }, getMainThreadExecutor());
+       public void notifyWorkerFailed(final ResourceID resourceID, final 
String message) {
+               runAsync(new Runnable() {
+                       @Override
+                       public void run() {
+                               WorkerRegistration<WorkerType> 
workerRegistration = taskExecutors.remove(resourceID);
 
-                       
infoMessageListenerRpcGatewayFuture.exceptionallyAsync(new 
ApplyFunction<Throwable, Void>() {
-                               @Override
-                               public Void apply(Throwable failure) {
-                                       log.warn("Receive a registration from 
unreachable info message listener on ({})", infoMessageListenerAddress);
-                                       return null;
+                               if (workerRegistration != null) {
+                                       log.info("Task manager {} failed 
because {}.", resourceID, message);
+                                       // TODO :: suggest failed task executor 
to stop itself
+                                       
slotManager.notifyTaskManagerFailure(resourceID);
+                               } else {
+                                       log.debug("Could not find a registered 
task manager with the process id {}.", resourceID);
                                }
-                       }, getMainThreadExecutor());
-               }
-       }
-
-       /**
-        * Unregisters an infoMessage listener
-        *
-        * @param infoMessageListenerAddress address of infoMessage listener to 
unregister from this resource manager
-        *
-        */
-       @RpcMethod
-       public void unRegisterInfoMessageListener(final String 
infoMessageListenerAddress) {
-               infoMessageListeners.remove(infoMessageListenerAddress);
+                       }
+               });
        }
 
-       /**
-        * Cleanup application and shut down cluster
-        *
-        * @param finalStatus
-        * @param optionalDiagnostics
-        */
-       @RpcMethod
-       public void shutDownCluster(final ApplicationStatus finalStatus, final 
String optionalDiagnostics) {
-               log.info("shut down cluster because application is in {}, 
diagnostics {}", finalStatus, optionalDiagnostics);
-               shutDownApplication(finalStatus, optionalDiagnostics);
-       }
+       // 
------------------------------------------------------------------------
+       //  Error Handling
+       // 
------------------------------------------------------------------------
 
        /**
-        * This method should be called by the framework once it detects that a 
currently registered task executor has failed.
+        * Notifies the ResourceManager that a fatal error has occurred and it 
cannot proceed.
+        * This method should be used when asynchronous threads want to notify 
the
+        * ResourceManager of a fatal error.
         *
-        * @param resourceID Id of the worker that has failed.
-        * @param message An informational message that explains why the worker 
failed.
+        * @param t The exception describing the fatal error
         */
-       public void notifyWorkerFailed(final ResourceID resourceID, String 
message) {
+       void onFatalErrorAsync(final Throwable t) {
                runAsync(new Runnable() {
                        @Override
                        public void run() {
-                               WorkerType worker = 
taskExecutors.remove(resourceID).getWorker();
-                               if (worker != null) {
-                                       // TODO :: suggest failed task executor 
to stop itself
-                                       
slotManager.notifyTaskManagerFailure(resourceID);
-                               }
+                               onFatalError(t);
                        }
                });
        }
 
        /**
-        * Gets the number of currently started TaskManagers.
+        * Notifies the ResourceManager that a fatal error has occurred and it 
cannot proceed.
+        * This method must only be called from within the ResourceManager's 
main thread.
         *
-        * @return The number of currently started TaskManagers.
+        * @param t The exception describing the fatal error
         */
-       public int getNumberOfStartedTaskManagers() {
-               return taskExecutors.size();
+       void onFatalError(Throwable t) {
+               log.error("Fatal error occurred.", t);
+               fatalErrorHandler.onFatalError(t);
        }
 
+       // 
------------------------------------------------------------------------
+       //  Testing methods
+       // 
------------------------------------------------------------------------
+
        /**
-        * Notifies the resource manager of a fatal error.
+        * Gets the leader session id of current resourceManager.
         *
-        * <p><b>IMPORTANT:</b> This should not cleanly shut down this master, 
but exit it in
-        * such a way that a high-availability setting would restart this or 
fail over
-        * to another master.
+        * @return return the leaderSessionId of current resourceManager, this 
returns null until the current resourceManager is granted leadership.
         */
-       public void onFatalError(final String message, final Throwable error) {
-               runAsync(new Runnable() {
-                       @Override
-                       public void run() {
-                               fatalError(message, error);
+       @VisibleForTesting
+       UUID getLeaderSessionID() {
+               return leaderSessionID;
+       }
+
+       // 
------------------------------------------------------------------------
+       //  Internal methods
+       // 
------------------------------------------------------------------------
+
+       private void clearState() {
+               jobMasters.clear();
+               taskExecutors.clear();
+               slotManager.clearState();
+               Iterator<JobIdLeaderListener> leaderListenerIterator =
+                       leaderListeners.values().iterator();
+               while (leaderListenerIterator.hasNext()) {
+                       JobIdLeaderListener listener = 
leaderListenerIterator.next();
+                       try {
+                               listener.stopService();
+                       } catch (Exception e) {
+                               onFatalError(e);
                        }
-               });
+                       leaderListenerIterator.remove();
+               }
+               leaderSessionID = new UUID(0, 0);
        }
 
        // 
------------------------------------------------------------------------
@@ -522,15 +634,6 @@ public abstract class ResourceManager<WorkerType extends 
Serializable>
        protected abstract void initialize() throws Exception;
 
        /**
-        * Notifies the resource master of a fatal error.
-        *
-        * <p><b>IMPORTANT:</b> This should not cleanly shut down this master, 
but exit it in
-        * such a way that a high-availability setting would restart this or 
fail over
-        * to another master.
-        */
-       protected abstract void fatalError(String message, Throwable error);
-
-       /**
         * The framework specific code for shutting down the application. This 
should report the
         * application's final status and shut down the resource manager 
cleanly.
         *
@@ -560,7 +663,7 @@ public abstract class ResourceManager<WorkerType extends 
Serializable>
        // 
------------------------------------------------------------------------
 
        public void sendInfoMessage(final String message) {
-               runAsync(new Runnable() {
+               getRpcService().execute(new Runnable() {
                        @Override
                        public void run() {
                                InfoMessage infoMessage = new 
InfoMessage(message);
@@ -675,23 +778,5 @@ public abstract class ResourceManager<WorkerType extends 
Serializable>
                        ResourceManager.this.handleError(exception);
                }
        }
-
-       private void clearState() {
-               jobMasters.clear();
-               taskExecutors.clear();
-               slotManager.clearState();
-               Iterator<JobIdLeaderListener> leaderListenerIterator =
-                       leaderListeners.values().iterator();
-               while (leaderListenerIterator.hasNext()) {
-                       JobIdLeaderListener listener = 
leaderListenerIterator.next();
-                       try {
-                               listener.stopService();
-                       } catch (Exception e) {
-                               handleError(e);
-                       }
-                       leaderListenerIterator.remove();
-               }
-               leaderSessionID = new UUID(0, 0);
-       }
 }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/f38bf448/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerConfiguration.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerConfiguration.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerConfiguration.java
new file mode 100644
index 0000000..920f1fc
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerConfiguration.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.Configuration;
+import 
org.apache.flink.runtime.resourcemanager.exceptions.ConfigurationException;
+import org.apache.flink.util.Preconditions;
+import scala.concurrent.duration.Duration;
+
+/**
+ * Resource manager configuration
+ */
+public class ResourceManagerConfiguration {
+
+       private final Time timeout;
+       private final Time heartbeatInterval;
+
+       public ResourceManagerConfiguration(Time timeout, Time 
heartbeatInterval) {
+               this.timeout = Preconditions.checkNotNull(timeout);
+               this.heartbeatInterval = 
Preconditions.checkNotNull(heartbeatInterval);
+       }
+
+       public Time getTimeout() {
+               return timeout;
+       }
+
+       public Time getHeartbeatInterval() {
+               return heartbeatInterval;
+       }
+
+       // 
--------------------------------------------------------------------------
+       // Static factory methods
+       // 
--------------------------------------------------------------------------
+
+       public static ResourceManagerConfiguration 
fromConfiguration(Configuration configuration) throws ConfigurationException {
+               ConfigOption<String> timeoutOption = ConfigOptions
+                       .key(ConfigConstants.AKKA_ASK_TIMEOUT)
+                       .defaultValue(ConfigConstants.DEFAULT_AKKA_ASK_TIMEOUT);
+
+               final String strTimeout = 
configuration.getString(timeoutOption);
+               final Time timeout;
+
+               try {
+                       timeout = 
Time.milliseconds(Duration.apply(strTimeout).toMillis());
+               } catch (NumberFormatException e) {
+                       throw new ConfigurationException("Could not parse the 
resource manager's timeout " +
+                               "value " + timeoutOption + '.', e);
+               }
+
+               ConfigOption<String> heartbeatIntervalOption = ConfigOptions
+                       .key(ConfigConstants.AKKA_WATCH_HEARTBEAT_INTERVAL)
+                       .defaultValue(timeout.toString());
+
+               final String strHeartbeatInterval = 
configuration.getString(heartbeatIntervalOption);
+               final Time heartbeatInterval;
+
+               try {
+                       heartbeatInterval = 
Time.milliseconds(Duration.apply(strHeartbeatInterval).toMillis());
+               } catch (NumberFormatException e) {
+                       throw new ConfigurationException("Could not parse the 
resource manager's heartbeat interval " +
+                               "value " + timeoutOption + '.', e);
+               }
+
+               return new ResourceManagerConfiguration(timeout, 
heartbeatInterval);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f38bf448/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java
index f9f55f8..926be0d 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java
@@ -22,7 +22,9 @@ import 
org.apache.flink.runtime.clusterframework.ApplicationStatus;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.metrics.MetricRegistry;
 import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerFactory;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcService;
 
 /**
@@ -33,10 +35,20 @@ import org.apache.flink.runtime.rpc.RpcService;
  */
 public class StandaloneResourceManager extends ResourceManager<ResourceID> {
 
-       public StandaloneResourceManager(RpcService rpcService,
+       public StandaloneResourceManager(
+                       RpcService rpcService,
+                       ResourceManagerConfiguration 
resourceManagerConfiguration,
                        HighAvailabilityServices highAvailabilityServices,
-                       SlotManagerFactory slotManagerFactory) {
-               super(rpcService, highAvailabilityServices, slotManagerFactory);
+                       SlotManagerFactory slotManagerFactory,
+                       MetricRegistry metricRegistry,
+                       FatalErrorHandler fatalErrorHandler) {
+               super(
+                       rpcService,
+                       resourceManagerConfiguration,
+                       highAvailabilityServices,
+                       slotManagerFactory,
+                       metricRegistry,
+                       fatalErrorHandler);
        }
 
        @Override
@@ -45,13 +57,6 @@ public class StandaloneResourceManager extends 
ResourceManager<ResourceID> {
        }
 
        @Override
-       protected void fatalError(final String message, final Throwable error) {
-               log.error("FATAL ERROR IN RESOURCE MANAGER: " + message, error);
-               // kill this process
-               System.exit(EXIT_CODE_FATAL_ERROR);
-       }
-
-       @Override
        protected void shutDownApplication(ApplicationStatus finalStatus, 
String optionalDiagnostics) {
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/f38bf448/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/exceptions/ConfigurationException.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/exceptions/ConfigurationException.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/exceptions/ConfigurationException.java
new file mode 100644
index 0000000..f081fff
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/exceptions/ConfigurationException.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager.exceptions;
+
+/**
+ * Base class for configuration related exception which occur when creating a 
configuration.
+ */
+public class ConfigurationException extends Exception {
+       private static final long serialVersionUID = 3971647332059381556L;
+
+       public ConfigurationException(String message) {
+               super(message);
+       }
+
+       public ConfigurationException(String message, Throwable cause) {
+               super(message, cause);
+       }
+
+       public ConfigurationException(Throwable cause) {
+               super(cause);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f38bf448/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/exceptions/ResourceManagerException.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/exceptions/ResourceManagerException.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/exceptions/ResourceManagerException.java
new file mode 100644
index 0000000..6b4d646
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/exceptions/ResourceManagerException.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager.exceptions;
+
+import org.apache.flink.runtime.resourcemanager.ResourceManager;
+
+/**
+ * Base class for {@link ResourceManager} exceptions.
+ */
+public class ResourceManagerException extends Exception {
+       private static final long serialVersionUID = -5503307426519195160L;
+
+       public ResourceManagerException(String message) {
+               super(message);
+       }
+
+       public ResourceManagerException(String message, Throwable cause) {
+               super(message, cause);
+       }
+
+       public ResourceManagerException(Throwable cause) {
+               super(cause);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f38bf448/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/exceptions/ResourceManagerRunner.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/exceptions/ResourceManagerRunner.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/exceptions/ResourceManagerRunner.java
new file mode 100644
index 0000000..1e6f04c
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/exceptions/ResourceManagerRunner.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager.exceptions;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.resourcemanager.ResourceManager;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration;
+import org.apache.flink.runtime.resourcemanager.StandaloneResourceManager;
+import org.apache.flink.runtime.resourcemanager.slotmanager.DefaultSlotManager;
+import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerFactory;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Simple {@link StandaloneResourceManager} runner. It instantiates the 
resource manager's services
+ * and handles fatal errors by shutting the resource manager down.
+ */
+public class ResourceManagerRunner implements FatalErrorHandler {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(ResourceManagerRunner.class);
+
+       private final Object lock = new Object();
+
+       private final ResourceManager<?> resourceManager;
+
+       public ResourceManagerRunner(
+                       final Configuration configuration,
+                       final RpcService rpcService,
+                       final HighAvailabilityServices highAvailabilityServices,
+                       final MetricRegistry metricRegistry) throws 
ConfigurationException {
+
+               Preconditions.checkNotNull(configuration);
+               Preconditions.checkNotNull(rpcService);
+               Preconditions.checkNotNull(highAvailabilityServices);
+               Preconditions.checkNotNull(metricRegistry);
+
+               final ResourceManagerConfiguration resourceManagerConfiguration 
= ResourceManagerConfiguration.fromConfiguration(configuration);
+               final SlotManagerFactory slotManagerFactory = new 
DefaultSlotManager.Factory();
+
+               this.resourceManager = new StandaloneResourceManager(
+                       rpcService,
+                       resourceManagerConfiguration,
+                       highAvailabilityServices,
+                       slotManagerFactory,
+                       metricRegistry,
+                       this);
+       }
+
+       
//-------------------------------------------------------------------------------------
+       // Lifecycle management
+       
//-------------------------------------------------------------------------------------
+
+       public void start() throws Exception {
+               resourceManager.start();
+       }
+
+       public void shutDown() throws Exception {
+               shutDownInternally();
+       }
+
+       private void shutDownInternally() throws Exception {
+               synchronized (lock) {
+                       resourceManager.shutDown();
+               }
+       }
+
+       
//-------------------------------------------------------------------------------------
+       // Fatal error handler
+       
//-------------------------------------------------------------------------------------
+
+       @Override
+       public void onFatalError(Throwable exception) {
+               LOG.error("Encountered fatal error.", exception);
+
+               try {
+                       shutDownInternally();
+               } catch (Exception e) {
+                       LOG.error("Could not properly shut down the resource 
manager.", e);
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f38bf448/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/registration/WorkerRegistration.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/registration/WorkerRegistration.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/registration/WorkerRegistration.java
index ff28f94..7ee7a1f 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/registration/WorkerRegistration.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/registration/WorkerRegistration.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.resourcemanager.registration;
 
 import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
+import org.apache.flink.util.Preconditions;
 
 import java.io.Serializable;
 
@@ -29,11 +30,11 @@ public class WorkerRegistration<WorkerType extends 
Serializable> extends TaskExe
 
        private static final long serialVersionUID = -2062957799469434614L;
 
-       private WorkerType worker;
+       private final WorkerType worker;
 
        public WorkerRegistration(TaskExecutorGateway taskExecutorGateway, 
WorkerType worker) {
                super(taskExecutorGateway);
-               this.worker = worker;
+               this.worker = Preconditions.checkNotNull(worker);
        }
 
        public WorkerType getWorker() {

http://git-wip-us.apache.org/repos/asf/flink/blob/f38bf448/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java
index ce1fdca..cb38e6e 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java
@@ -18,24 +18,20 @@
 
 package org.apache.flink.runtime.resourcemanager;
 
+import org.apache.flink.api.common.time.Time;
 import 
org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
 import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
-import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
+import org.apache.flink.runtime.metrics.MetricRegistry;
 import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerFactory;
-import org.apache.flink.runtime.rpc.MainThreadExecutable;
-import org.apache.flink.runtime.rpc.RpcEndpoint;
-import org.apache.flink.runtime.rpc.RpcGateway;
 import org.apache.flink.runtime.rpc.RpcService;
-import org.apache.flink.runtime.rpc.StartStoppable;
+import org.apache.flink.runtime.rpc.TestingSerialRpcService;
+import org.apache.flink.runtime.util.TestingFatalErrorHandler;
 import org.junit.Assert;
 import org.junit.Test;
 
 import java.util.UUID;
 
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.doCallRealMethod;
 import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
 
 /**
  * resourceManager HA test, including grant leadership and revoke leadership
@@ -44,37 +40,38 @@ public class ResourceManagerHATest {
 
        @Test
        public void testGrantAndRevokeLeadership() throws Exception {
-               // mock a RpcService which will return a special RpcGateway 
when call its startServer method,
-               // the returned RpcGateway directly executes runAsync calls
-               TestingResourceManagerGatewayProxy gateway = 
mock(TestingResourceManagerGatewayProxy.class);
-               doCallRealMethod().when(gateway).runAsync(any(Runnable.class));
-
-               RpcService rpcService = mock(RpcService.class);
-               
when(rpcService.startServer(any(RpcEndpoint.class))).thenReturn(gateway);
+               RpcService rpcService = new TestingSerialRpcService();
 
                TestingLeaderElectionService leaderElectionService = new 
TestingLeaderElectionService();
                TestingHighAvailabilityServices highAvailabilityServices = new 
TestingHighAvailabilityServices();
                
highAvailabilityServices.setResourceManagerLeaderElectionService(leaderElectionService);
 
+               ResourceManagerConfiguration resourceManagerConfiguration = new 
ResourceManagerConfiguration(Time.seconds(5L), Time.seconds(5L));
+               SlotManagerFactory slotManagerFactory = new 
TestingSlotManagerFactory();
+               MetricRegistry metricRegistry = mock(MetricRegistry.class);
+               TestingFatalErrorHandler testingFatalErrorHandler = new 
TestingFatalErrorHandler();
+
                final ResourceManager resourceManager =
-                       new TestingResourceManager(rpcService, 
highAvailabilityServices);
+                       new StandaloneResourceManager(
+                               rpcService,
+                               resourceManagerConfiguration,
+                               highAvailabilityServices,
+                               slotManagerFactory,
+                               metricRegistry,
+                               testingFatalErrorHandler);
                resourceManager.start();
                // before grant leadership, resourceManager's leaderId is null
-               Assert.assertEquals(new UUID(0,0), 
resourceManager.getLeaderSessionID());
+               Assert.assertEquals(null, resourceManager.getLeaderSessionID());
                final UUID leaderId = UUID.randomUUID();
                leaderElectionService.isLeader(leaderId);
                // after grant leadership, resourceManager's leaderId has value
                Assert.assertEquals(leaderId, 
resourceManager.getLeaderSessionID());
                // then revoke leadership, resourceManager's leaderId is null 
again
                leaderElectionService.notLeader();
-               Assert.assertEquals(new UUID(0,0), 
resourceManager.getLeaderSessionID());
-       }
+               Assert.assertEquals(null, resourceManager.getLeaderSessionID());
 
-       private static abstract class TestingResourceManagerGatewayProxy 
implements MainThreadExecutable, StartStoppable, RpcGateway {
-               @Override
-               public void runAsync(Runnable runnable) {
-                       runnable.run();
+               if (testingFatalErrorHandler.hasExceptionOccurred()) {
+                       testingFatalErrorHandler.rethrowError();
                }
        }
-
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/f38bf448/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java
index 14afd0e..7b8d254 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java
@@ -19,14 +19,19 @@
 package org.apache.flink.runtime.resourcemanager;
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.concurrent.Future;
 import 
org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
 import org.apache.flink.runtime.jobmaster.JobMasterGateway;
 import org.apache.flink.runtime.jobmaster.JobMasterRegistrationSuccess;
 import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
 import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerFactory;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.TestingSerialRpcService;
 import org.apache.flink.runtime.registration.RegistrationResponse;
+import org.apache.flink.runtime.util.TestingFatalErrorHandler;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -61,13 +66,18 @@ public class ResourceManagerJobMasterTest {
                TestingLeaderElectionService 
resourceManagerLeaderElectionService = new TestingLeaderElectionService();
                UUID jmLeaderID = UUID.randomUUID();
                TestingLeaderRetrievalService jobMasterLeaderRetrievalService = 
new TestingLeaderRetrievalService(jobMasterAddress, jmLeaderID);
-               final ResourceManager resourceManager = 
createAndStartResourceManager(resourceManagerLeaderElectionService, jobID, 
jobMasterLeaderRetrievalService);
+               TestingFatalErrorHandler testingFatalErrorHandler = new 
TestingFatalErrorHandler();
+               final ResourceManager resourceManager = 
createAndStartResourceManager(resourceManagerLeaderElectionService, jobID, 
jobMasterLeaderRetrievalService, testingFatalErrorHandler);
                final UUID rmLeaderSessionId = 
grantResourceManagerLeadership(resourceManagerLeaderElectionService);
 
                // test response successful
                Future<RegistrationResponse> successfulFuture = 
resourceManager.registerJobMaster(rmLeaderSessionId, jmLeaderID, 
jobMasterAddress, jobID);
                RegistrationResponse response = successfulFuture.get(5, 
TimeUnit.SECONDS);
                assertTrue(response instanceof JobMasterRegistrationSuccess);
+
+               if (testingFatalErrorHandler.hasExceptionOccurred()) {
+                       testingFatalErrorHandler.rethrowError();
+               }
        }
 
        /**
@@ -80,13 +90,18 @@ public class ResourceManagerJobMasterTest {
                TestingLeaderElectionService 
resourceManagerLeaderElectionService = new TestingLeaderElectionService();
                UUID jmLeaderID = UUID.randomUUID();
                TestingLeaderRetrievalService jobMasterLeaderRetrievalService = 
new TestingLeaderRetrievalService(jobMasterAddress, jmLeaderID);
-               final ResourceManager resourceManager = 
createAndStartResourceManager(resourceManagerLeaderElectionService, jobID, 
jobMasterLeaderRetrievalService);
+               TestingFatalErrorHandler testingFatalErrorHandler = new 
TestingFatalErrorHandler();
+               final ResourceManager resourceManager = 
createAndStartResourceManager(resourceManagerLeaderElectionService, jobID, 
jobMasterLeaderRetrievalService, testingFatalErrorHandler);
                final UUID rmLeaderSessionId = 
grantResourceManagerLeadership(resourceManagerLeaderElectionService);
 
                // test throw exception when receive a registration from job 
master which takes unmatched leaderSessionId
                UUID differentLeaderSessionID = UUID.randomUUID();
                Future<RegistrationResponse> unMatchedLeaderFuture = 
resourceManager.registerJobMaster(differentLeaderSessionID, jmLeaderID, 
jobMasterAddress, jobID);
                assertTrue(unMatchedLeaderFuture.get(5, TimeUnit.SECONDS) 
instanceof RegistrationResponse.Decline);
+
+               if (testingFatalErrorHandler.hasExceptionOccurred()) {
+                       testingFatalErrorHandler.rethrowError();
+               }
        }
 
        /**
@@ -98,7 +113,8 @@ public class ResourceManagerJobMasterTest {
                JobID jobID = mockJobMaster(jobMasterAddress);
                TestingLeaderElectionService 
resourceManagerLeaderElectionService = new TestingLeaderElectionService();
                TestingLeaderRetrievalService jobMasterLeaderRetrievalService = 
new TestingLeaderRetrievalService();
-               final ResourceManager resourceManager = 
createAndStartResourceManager(resourceManagerLeaderElectionService, jobID, 
jobMasterLeaderRetrievalService);
+               TestingFatalErrorHandler testingFatalErrorHandler = new 
TestingFatalErrorHandler();
+               final ResourceManager resourceManager = 
createAndStartResourceManager(resourceManagerLeaderElectionService, jobID, 
jobMasterLeaderRetrievalService, testingFatalErrorHandler);
                final UUID rmLeaderSessionId = 
grantResourceManagerLeadership(resourceManagerLeaderElectionService);
                final UUID jmLeaderSessionId = 
grantResourceManagerLeadership(resourceManagerLeaderElectionService);
 
@@ -106,6 +122,10 @@ public class ResourceManagerJobMasterTest {
                UUID differentLeaderSessionID = UUID.randomUUID();
                Future<RegistrationResponse> unMatchedLeaderFuture = 
resourceManager.registerJobMaster(rmLeaderSessionId, differentLeaderSessionID, 
jobMasterAddress, jobID);
                assertTrue(unMatchedLeaderFuture.get(5, TimeUnit.SECONDS) 
instanceof RegistrationResponse.Decline);
+
+               if (testingFatalErrorHandler.hasExceptionOccurred()) {
+                       testingFatalErrorHandler.rethrowError();
+               }
        }
 
        /**
@@ -117,7 +137,8 @@ public class ResourceManagerJobMasterTest {
                JobID jobID = mockJobMaster(jobMasterAddress);
                TestingLeaderElectionService 
resourceManagerLeaderElectionService = new TestingLeaderElectionService();
                TestingLeaderRetrievalService jobMasterLeaderRetrievalService = 
new TestingLeaderRetrievalService();
-               final ResourceManager resourceManager = 
createAndStartResourceManager(resourceManagerLeaderElectionService, jobID, 
jobMasterLeaderRetrievalService);
+               TestingFatalErrorHandler testingFatalErrorHandler = new 
TestingFatalErrorHandler();
+               final ResourceManager resourceManager = 
createAndStartResourceManager(resourceManagerLeaderElectionService, jobID, 
jobMasterLeaderRetrievalService, testingFatalErrorHandler);
                final UUID rmLeaderSessionId = 
grantResourceManagerLeadership(resourceManagerLeaderElectionService);
                final UUID jmLeaderSessionId = 
grantResourceManagerLeadership(resourceManagerLeaderElectionService);
 
@@ -125,6 +146,10 @@ public class ResourceManagerJobMasterTest {
                String invalidAddress = "/jobMasterAddress2";
                Future<RegistrationResponse> invalidAddressFuture = 
resourceManager.registerJobMaster(rmLeaderSessionId, jmLeaderSessionId, 
invalidAddress, jobID);
                assertTrue(invalidAddressFuture.get(5, TimeUnit.SECONDS) 
instanceof RegistrationResponse.Decline);
+
+               if (testingFatalErrorHandler.hasExceptionOccurred()) {
+                       testingFatalErrorHandler.rethrowError();
+               }
        }
 
        /**
@@ -136,7 +161,8 @@ public class ResourceManagerJobMasterTest {
                JobID jobID = mockJobMaster(jobMasterAddress);
                TestingLeaderElectionService 
resourceManagerLeaderElectionService = new TestingLeaderElectionService();
                TestingLeaderRetrievalService jobMasterLeaderRetrievalService = 
new TestingLeaderRetrievalService();
-               final ResourceManager resourceManager = 
createAndStartResourceManager(resourceManagerLeaderElectionService, jobID, 
jobMasterLeaderRetrievalService);
+               TestingFatalErrorHandler testingFatalErrorHandler = new 
TestingFatalErrorHandler();
+               final ResourceManager resourceManager = 
createAndStartResourceManager(resourceManagerLeaderElectionService, jobID, 
jobMasterLeaderRetrievalService, testingFatalErrorHandler);
                final UUID rmLeaderSessionId = 
grantResourceManagerLeadership(resourceManagerLeaderElectionService);
                final UUID jmLeaderSessionId = 
grantResourceManagerLeadership(resourceManagerLeaderElectionService);
 
@@ -145,6 +171,10 @@ public class ResourceManagerJobMasterTest {
                Future<RegistrationResponse> declineFuture = 
resourceManager.registerJobMaster(rmLeaderSessionId, jmLeaderSessionId, 
jobMasterAddress, unknownJobIDToHAServices);
                RegistrationResponse response = declineFuture.get(5, 
TimeUnit.SECONDS);
                assertTrue(response instanceof RegistrationResponse.Decline);
+
+               if (testingFatalErrorHandler.hasExceptionOccurred()) {
+                       testingFatalErrorHandler.rethrowError();
+               }
        }
 
        private JobID mockJobMaster(String jobMasterAddress) {
@@ -154,11 +184,26 @@ public class ResourceManagerJobMasterTest {
                return jobID;
        }
 
-       private ResourceManager 
createAndStartResourceManager(TestingLeaderElectionService 
resourceManagerLeaderElectionService, JobID jobID, 
TestingLeaderRetrievalService jobMasterLeaderRetrievalService) {
+       private ResourceManager createAndStartResourceManager(
+                       TestingLeaderElectionService 
resourceManagerLeaderElectionService,
+                       JobID jobID,
+                       TestingLeaderRetrievalService 
jobMasterLeaderRetrievalService,
+                       FatalErrorHandler fatalErrorHandler) throws Exception {
                TestingHighAvailabilityServices highAvailabilityServices = new 
TestingHighAvailabilityServices();
                
highAvailabilityServices.setResourceManagerLeaderElectionService(resourceManagerLeaderElectionService);
                highAvailabilityServices.setJobMasterLeaderRetriever(jobID, 
jobMasterLeaderRetrievalService);
-               ResourceManager resourceManager = new 
TestingResourceManager(rpcService, highAvailabilityServices);
+
+               ResourceManagerConfiguration resourceManagerConfiguration = new 
ResourceManagerConfiguration(Time.seconds(5L), Time.seconds(5L));
+               SlotManagerFactory slotManagerFactory = new 
TestingSlotManagerFactory();
+               MetricRegistry metricRegistry = mock(MetricRegistry.class);
+
+               ResourceManager resourceManager = new StandaloneResourceManager(
+                       rpcService,
+                       resourceManagerConfiguration,
+                       highAvailabilityServices,
+                       slotManagerFactory,
+                       metricRegistry,
+                       fatalErrorHandler);
                resourceManager.start();
                return resourceManager;
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/f38bf448/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java
index a577c26..4640eab 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java
@@ -18,15 +18,19 @@
 
 package org.apache.flink.runtime.resourcemanager;
 
+import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.concurrent.Future;
 import 
org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
 import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.TestingSerialRpcService;
 import org.apache.flink.runtime.registration.RegistrationResponse;
 import org.apache.flink.runtime.taskexecutor.SlotReport;
 import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
 import org.apache.flink.runtime.taskexecutor.TaskExecutorRegistrationSuccess;
+import org.apache.flink.runtime.util.TestingFatalErrorHandler;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -34,7 +38,6 @@ import org.junit.Test;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 
-import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.mock;
@@ -53,13 +56,16 @@ public class ResourceManagerTaskExecutorTest {
 
        private UUID leaderSessionId;
 
+       private TestingFatalErrorHandler testingFatalErrorHandler;
+
        @Before
        public void setup() throws Exception {
                rpcService = new TestingSerialRpcService();
 
                taskExecutorResourceID = mockTaskExecutor(taskExecutorAddress);
                TestingLeaderElectionService rmLeaderElectionService = new 
TestingLeaderElectionService();
-               resourceManager = 
createAndStartResourceManager(rmLeaderElectionService);
+               testingFatalErrorHandler = new TestingFatalErrorHandler();
+               resourceManager = 
createAndStartResourceManager(rmLeaderElectionService, 
testingFatalErrorHandler);
                leaderSessionId = grantLeadership(rmLeaderElectionService);
        }
 
@@ -73,18 +79,24 @@ public class ResourceManagerTaskExecutorTest {
         */
        @Test
        public void testRegisterTaskExecutor() throws Exception {
-               // test response successful
-               Future<RegistrationResponse> successfulFuture =
-                       resourceManager.registerTaskExecutor(leaderSessionId, 
taskExecutorAddress, taskExecutorResourceID, slotReport);
-               RegistrationResponse response = successfulFuture.get(5, 
TimeUnit.SECONDS);
-               assertTrue(response instanceof TaskExecutorRegistrationSuccess);
-
-               // test response successful with instanceID not equal to 
previous when receive duplicate registration from taskExecutor
-               Future<RegistrationResponse> duplicateFuture =
-                       resourceManager.registerTaskExecutor(leaderSessionId, 
taskExecutorAddress, taskExecutorResourceID, slotReport);
-               RegistrationResponse duplicateResponse = duplicateFuture.get();
-               assertTrue(duplicateResponse instanceof 
TaskExecutorRegistrationSuccess);
-               assertNotEquals(((TaskExecutorRegistrationSuccess) 
response).getRegistrationId(), ((TaskExecutorRegistrationSuccess) 
duplicateResponse).getRegistrationId());
+               try {
+                       // test response successful
+                       Future<RegistrationResponse> successfulFuture =
+                               
resourceManager.registerTaskExecutor(leaderSessionId, taskExecutorAddress, 
taskExecutorResourceID, slotReport);
+                       RegistrationResponse response = successfulFuture.get(5, 
TimeUnit.SECONDS);
+                       assertTrue(response instanceof 
TaskExecutorRegistrationSuccess);
+
+                       // test response successful with instanceID not equal 
to previous when receive duplicate registration from taskExecutor
+                       Future<RegistrationResponse> duplicateFuture =
+                               
resourceManager.registerTaskExecutor(leaderSessionId, taskExecutorAddress, 
taskExecutorResourceID, slotReport);
+                       RegistrationResponse duplicateResponse = 
duplicateFuture.get();
+                       assertTrue(duplicateResponse instanceof 
TaskExecutorRegistrationSuccess);
+                       assertNotEquals(((TaskExecutorRegistrationSuccess) 
response).getRegistrationId(), ((TaskExecutorRegistrationSuccess) 
duplicateResponse).getRegistrationId());
+               } finally {
+                       if (testingFatalErrorHandler.hasExceptionOccurred()) {
+                               testingFatalErrorHandler.rethrowError();
+                       }
+               }
        }
 
        /**
@@ -92,11 +104,17 @@ public class ResourceManagerTaskExecutorTest {
         */
        @Test
        public void testRegisterTaskExecutorWithUnmatchedLeaderSessionId() 
throws Exception {
-               // test throw exception when receive a registration from 
taskExecutor which takes unmatched leaderSessionId
-               UUID differentLeaderSessionID = UUID.randomUUID();
-               Future<RegistrationResponse> unMatchedLeaderFuture =
-                       
resourceManager.registerTaskExecutor(differentLeaderSessionID, 
taskExecutorAddress, taskExecutorResourceID, slotReport);
-               assertTrue(unMatchedLeaderFuture.get(5, TimeUnit.SECONDS) 
instanceof RegistrationResponse.Decline);
+               try {
+                       // test throw exception when receive a registration 
from taskExecutor which takes unmatched leaderSessionId
+                       UUID differentLeaderSessionID = UUID.randomUUID();
+                       Future<RegistrationResponse> unMatchedLeaderFuture =
+                               
resourceManager.registerTaskExecutor(differentLeaderSessionID, 
taskExecutorAddress, taskExecutorResourceID, slotReport);
+                       assertTrue(unMatchedLeaderFuture.get(5, 
TimeUnit.SECONDS) instanceof RegistrationResponse.Decline);
+               } finally {
+                       if (testingFatalErrorHandler.hasExceptionOccurred()) {
+                               testingFatalErrorHandler.rethrowError();
+                       }
+               }
        }
 
        /**
@@ -104,11 +122,17 @@ public class ResourceManagerTaskExecutorTest {
         */
        @Test
        public void testRegisterTaskExecutorFromInvalidAddress() throws 
Exception {
-               // test throw exception when receive a registration from 
taskExecutor which takes invalid address
-               String invalidAddress = "/taskExecutor2";
-               Future<RegistrationResponse> invalidAddressFuture =
-                       resourceManager.registerTaskExecutor(leaderSessionId, 
invalidAddress, taskExecutorResourceID, slotReport);
-               assertTrue(invalidAddressFuture.get(5, TimeUnit.SECONDS) 
instanceof RegistrationResponse.Decline);
+               try {
+                       // test throw exception when receive a registration 
from taskExecutor which takes invalid address
+                       String invalidAddress = "/taskExecutor2";
+                       Future<RegistrationResponse> invalidAddressFuture =
+                               
resourceManager.registerTaskExecutor(leaderSessionId, invalidAddress, 
taskExecutorResourceID, slotReport);
+                       assertTrue(invalidAddressFuture.get(5, 
TimeUnit.SECONDS) instanceof RegistrationResponse.Decline);
+               } finally {
+                       if (testingFatalErrorHandler.hasExceptionOccurred()) {
+                               testingFatalErrorHandler.rethrowError();
+                       }
+               }
        }
 
        private ResourceID mockTaskExecutor(String taskExecutorAddress) {
@@ -118,11 +142,22 @@ public class ResourceManagerTaskExecutorTest {
                return taskExecutorResourceID;
        }
 
-       private StandaloneResourceManager 
createAndStartResourceManager(TestingLeaderElectionService 
rmLeaderElectionService) {
+       private StandaloneResourceManager 
createAndStartResourceManager(TestingLeaderElectionService 
rmLeaderElectionService, FatalErrorHandler fatalErrorHandler) throws Exception {
                TestingHighAvailabilityServices highAvailabilityServices = new 
TestingHighAvailabilityServices();
                
highAvailabilityServices.setResourceManagerLeaderElectionService(rmLeaderElectionService);
+               TestingSlotManagerFactory slotManagerFactory = new 
TestingSlotManagerFactory();
+               ResourceManagerConfiguration resourceManagerConfiguration = new 
ResourceManagerConfiguration(Time.seconds(5L), Time.seconds(5L));
+               MetricRegistry metricRegistry = mock(MetricRegistry.class);
+
+
                StandaloneResourceManager resourceManager =
-                       new TestingResourceManager(rpcService, 
highAvailabilityServices);
+                       new StandaloneResourceManager(
+                               rpcService,
+                               resourceManagerConfiguration,
+                               highAvailabilityServices,
+                               slotManagerFactory,
+                               metricRegistry,
+                               fatalErrorHandler);
                resourceManager.start();
                return resourceManager;
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/f38bf448/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/TestingResourceManager.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/TestingResourceManager.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/TestingResourceManager.java
deleted file mode 100644
index 6b4ca14..0000000
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/TestingResourceManager.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.runtime.resourcemanager;
-
-import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
-import 
org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
-import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
-import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerFactory;
-import org.apache.flink.runtime.rpc.RpcService;
-
-public class TestingResourceManager extends StandaloneResourceManager {
-
-       public TestingResourceManager(RpcService rpcService) {
-               this(rpcService, new TestingHighAvailabilityServices());
-       }
-
-       public TestingResourceManager(
-                       RpcService rpcService,
-                       HighAvailabilityServices highAvailabilityServices) {
-               this(rpcService, highAvailabilityServices, new 
TestingSlotManagerFactory());
-       }
-
-       public TestingResourceManager(
-                       RpcService rpcService,
-                       HighAvailabilityServices highAvailabilityServices,
-                       SlotManagerFactory slotManagerFactory) {
-               super(rpcService, highAvailabilityServices, slotManagerFactory);
-       }
-
-       private static class TestingSlotManagerFactory implements 
SlotManagerFactory {
-
-               @Override
-               public SlotManager create(ResourceManagerServices rmServices) {
-                       return new TestingSlotManager(rmServices);
-               }
-       }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f38bf448/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/TestingSlotManagerFactory.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/TestingSlotManagerFactory.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/TestingSlotManagerFactory.java
new file mode 100644
index 0000000..6b5f6b2
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/TestingSlotManagerFactory.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager;
+
+import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
+import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerFactory;
+
+public class TestingSlotManagerFactory implements SlotManagerFactory {
+
+       @Override
+       public SlotManager create(ResourceManagerServices rmServices) {
+               return new TestingSlotManager(rmServices);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f38bf448/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
index 86cd1f8..08ceb86 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
@@ -30,14 +30,18 @@ import 
org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices
 import org.apache.flink.runtime.jobmaster.JobMasterGateway;
 import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
 import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService;
+import org.apache.flink.runtime.metrics.MetricRegistry;
 import org.apache.flink.runtime.registration.RegistrationResponse;
+import org.apache.flink.runtime.resourcemanager.ResourceManager;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerServices;
 import org.apache.flink.runtime.resourcemanager.SlotRequest;
-import org.apache.flink.runtime.resourcemanager.TestingResourceManager;
+import org.apache.flink.runtime.resourcemanager.StandaloneResourceManager;
 import org.apache.flink.runtime.resourcemanager.TestingSlotManager;
 import 
org.apache.flink.runtime.resourcemanager.messages.jobmanager.RMSlotRequestReply;
 import 
org.apache.flink.runtime.resourcemanager.messages.taskexecutor.TMSlotRequestReply;
 import 
org.apache.flink.runtime.resourcemanager.registration.TaskExecutorRegistration;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.runtime.rpc.TestingSerialRpcService;
 import org.apache.flink.runtime.taskexecutor.SlotReport;
@@ -102,9 +106,17 @@ public class SlotProtocolTest extends TestLogger {
                TestingLeaderElectionService rmLeaderElectionService =
                        configureHA(testingHaServices, jobID, rmAddress, 
rmLeaderID, jmAddress, jmLeaderID);
 
+               ResourceManagerConfiguration resourceManagerConfiguration = new 
ResourceManagerConfiguration(Time.seconds(5L), Time.seconds(5L));
+
                final TestingSlotManagerFactory slotManagerFactory = new 
TestingSlotManagerFactory();
                SpiedResourceManager resourceManager =
-                       new SpiedResourceManager(testRpcService, 
testingHaServices, slotManagerFactory);
+                       new SpiedResourceManager(
+                               testRpcService,
+                               resourceManagerConfiguration,
+                               testingHaServices,
+                               slotManagerFactory,
+                               mock(MetricRegistry.class),
+                               mock(FatalErrorHandler.class));
                resourceManager.start();
                rmLeaderElectionService.isLeader(rmLeaderID);
 
@@ -193,16 +205,26 @@ public class SlotProtocolTest extends TestLogger {
                        .thenReturn(new 
FlinkCompletableFuture<TMSlotRequestReply>());
                testRpcService.registerGateway(tmAddress, taskExecutorGateway);
 
+               ResourceManagerConfiguration resourceManagerConfiguration = new 
ResourceManagerConfiguration(Time.seconds(5L), Time.seconds(5L));
+
                TestingSlotManagerFactory slotManagerFactory = new 
TestingSlotManagerFactory();
-               TestingResourceManager resourceManager =
-                       Mockito.spy(new TestingResourceManager(testRpcService, 
testingHaServices, slotManagerFactory));
+               ResourceManager<ResourceID> resourceManager =
+                       Mockito.spy(new StandaloneResourceManager(
+                               testRpcService,
+                               resourceManagerConfiguration,
+                               testingHaServices,
+                               slotManagerFactory,
+                               mock(MetricRegistry.class),
+                               mock(FatalErrorHandler.class)));
                resourceManager.start();
                rmLeaderElectionService.isLeader(rmLeaderID);
 
+               Thread.sleep(1000);
+
                Future<RegistrationResponse> registrationFuture =
                        resourceManager.registerJobMaster(rmLeaderID, 
jmLeaderID, jmAddress, jobID);
                try {
-                       registrationFuture.get(5, TimeUnit.SECONDS);
+                       registrationFuture.get(5L, TimeUnit.SECONDS);
                } catch (Exception e) {
                        Assert.fail("JobManager registration Future didn't 
become ready.");
                }
@@ -258,15 +280,24 @@ public class SlotProtocolTest extends TestLogger {
                return rmLeaderElectionService;
        }
 
-       private static class SpiedResourceManager extends 
TestingResourceManager {
+       private static class SpiedResourceManager extends 
StandaloneResourceManager {
 
                private int startNewWorkerCalled = 0;
 
                public SpiedResourceManager(
                                RpcService rpcService,
+                               ResourceManagerConfiguration 
resourceManagerConfiguration,
                                HighAvailabilityServices 
highAvailabilityServices,
-                               SlotManagerFactory slotManagerFactory) {
-                       super(rpcService, highAvailabilityServices, 
slotManagerFactory);
+                               SlotManagerFactory slotManagerFactory,
+                               MetricRegistry metricRegistry,
+                               FatalErrorHandler fatalErrorHandler) {
+                       super(
+                               rpcService,
+                               resourceManagerConfiguration,
+                               highAvailabilityServices,
+                               slotManagerFactory,
+                               metricRegistry,
+                               fatalErrorHandler);
                }
 
 

http://git-wip-us.apache.org/repos/asf/flink/blob/f38bf448/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
index caae54e..553db37 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
@@ -65,12 +65,12 @@ import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.TestingSerialRpcService;
 import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
 import org.apache.flink.runtime.taskexecutor.slot.TaskSlotTable;
-import 
org.apache.flink.runtime.taskexecutor.exceptions.SlotAllocationException;
 import org.apache.flink.runtime.taskexecutor.slot.TimerService;
 import org.apache.flink.runtime.taskmanager.CheckpointResponder;
 import org.apache.flink.runtime.taskmanager.Task;
 import org.apache.flink.runtime.taskmanager.TaskManagerActions;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.runtime.util.TestingFatalErrorHandler;
 import org.apache.flink.util.SerializedValue;
 import org.apache.flink.util.TestLogger;
 
@@ -81,15 +81,12 @@ import org.junit.Test;
 import org.junit.rules.TestName;
 import org.mockito.Matchers;
 import org.powermock.api.mockito.PowerMockito;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.net.InetAddress;
 import java.net.URL;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.UUID;
-import java.util.concurrent.atomic.AtomicReference;
 
 import static org.junit.Assert.*;
 import static org.mockito.Mockito.*;
@@ -434,7 +431,7 @@ public class TaskExecutorTest extends TestLogger {
                                        any(Time.class));
                } finally {
                        // check if a concurrent error occurred
-                       testingFatalErrorHandler.rethrowException();
+                       testingFatalErrorHandler.rethrowError();
 
                        rpc.stopService();
                }
@@ -541,63 +538,12 @@ public class TaskExecutorTest extends TestLogger {
                        assertTrue(taskSlotTable.isSlotFree(1));
                } finally {
                        // check if a concurrent error occurred
-                       testingFatalErrorHandler.rethrowException();
+                       testingFatalErrorHandler.rethrowError();
 
                        rpc.stopService();
                }
        }
 
-       private static class TestingFatalErrorHandler implements 
FatalErrorHandler {
-               private static final Logger LOG = 
LoggerFactory.getLogger(TestingFatalErrorHandler.class);
-               private final AtomicReference<Throwable> atomicThrowable;
-
-               public TestingFatalErrorHandler() {
-                       atomicThrowable = new AtomicReference<>(null);
-               }
-
-               public void rethrowException() throws TestingException {
-                       Throwable throwable = atomicThrowable.get();
-
-                       if (throwable != null) {
-                               throw new TestingException(throwable);
-                       }
-               }
-
-               public boolean hasExceptionOccurred() {
-                       return atomicThrowable.get() != null;
-               }
-
-               public Throwable getException() {
-                       return atomicThrowable.get();
-               }
-
-               @Override
-               public void onFatalError(Throwable exception) {
-                       LOG.error("OnFatalError:", exception);
-                       atomicThrowable.compareAndSet(null, exception);
-               }
-
-               
//------------------------------------------------------------------
-               // static utility classes
-               
//------------------------------------------------------------------
-
-               private static final class TestingException extends Exception {
-                       public TestingException(String message) {
-                               super(message);
-                       }
-
-                       public TestingException(String message, Throwable 
cause) {
-                               super(message, cause);
-                       }
-
-                       public TestingException(Throwable cause) {
-                               super(cause);
-                       }
-
-                       private static final long serialVersionUID = 
-4648195335470914498L;
-               }
-       }
-
        /**
         * Tests that all allocation requests for slots are ignored if the slot 
has been reported as
         * free by the TaskExecutor but this report hasn't been confirmed by 
the ResourceManager.

Reply via email to