[FLINK-4354] [heartbeat] Add heartbeats between the ResourceManager and 
TaskExecutor


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/83b99f8a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/83b99f8a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/83b99f8a

Branch: refs/heads/table-retraction
Commit: 83b99f8a624ddf35deb934b4d4358582657998c6
Parents: fd90672
Author: Till Rohrmann <trohrm...@apache.org>
Authored: Wed Mar 22 12:03:45 2017 +0100
Committer: Till Rohrmann <trohrm...@apache.org>
Committed: Thu Mar 23 13:58:44 2017 +0100

----------------------------------------------------------------------
 .../heartbeat/TestingHeartbeatManagerImpl.java  | 63 -----------------
 .../TestingHeartbeatManagerSenderImpl.java      | 61 -----------------
 .../heartbeat/TestingHeartbeatServices.java     | 52 --------------
 .../flink/runtime/jobmaster/JobMaster.java      |  4 +-
 .../flink/runtime/minicluster/MiniCluster.java  |  7 +-
 .../resourcemanager/ResourceManager.java        | 71 +++++++++++---------
 .../resourcemanager/ResourceManagerGateway.java |  7 +-
 .../resourcemanager/ResourceManagerRunner.java  |  2 +-
 .../StandaloneResourceManager.java              |  4 +-
 .../runtime/taskexecutor/TaskExecutor.java      | 63 ++++++++++-------
 .../clusterframework/ResourceManagerTest.java   |  6 +-
 .../heartbeat/TestingHeartbeatServices.java     | 52 ++++++++++++++
 .../flink/runtime/jobmaster/JobMasterTest.java  | 10 ++-
 .../resourcemanager/ResourceManagerHATest.java  |  3 +-
 .../ResourceManagerJobMasterTest.java           |  6 +-
 .../ResourceManagerTaskExecutorTest.java        |  7 +-
 .../slotmanager/SlotProtocolTest.java           |  6 +-
 .../taskexecutor/TaskExecutorITCase.java        |  3 +-
 .../runtime/taskexecutor/TaskExecutorTest.java  | 16 +++--
 .../src/test/resources/log4j-test.properties    |  2 +-
 .../yarn/YarnFlinkApplicationMasterRunner.java  |  3 +-
 .../apache/flink/yarn/YarnResourceManager.java  |  4 +-
 22 files changed, 181 insertions(+), 271 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/83b99f8a/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/TestingHeartbeatManagerImpl.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/TestingHeartbeatManagerImpl.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/TestingHeartbeatManagerImpl.java
deleted file mode 100644
index a6e056d..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/TestingHeartbeatManagerImpl.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.heartbeat;
-
-import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.concurrent.ScheduledExecutor;
-import org.slf4j.Logger;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.Executor;
-
-/**
- * Heartbeat manager implementation which extends {@link HeartbeatManagerImpl} 
for testing.
- * It overrides the {@link #unmonitorTarget(ResourceID)} to wait for some 
tests complete
- * when notify heartbeat timeout.
- *
- * @param <I> Type of the incoming heartbeat payload
- * @param <O> Type of the outgoing heartbeat payload
- */
-public class TestingHeartbeatManagerImpl<I, O> extends HeartbeatManagerImpl<I, 
O> {
-
-       private final CountDownLatch waitLatch;
-
-       public TestingHeartbeatManagerImpl(
-                       CountDownLatch waitLatch,
-                       long heartbeatTimeoutIntervalMs,
-                       ResourceID ownResourceID,
-                       HeartbeatListener<I, O> heartbeatListener,
-                       Executor executor,
-                       ScheduledExecutor scheduledExecutor,
-                       Logger log) {
-
-               super(heartbeatTimeoutIntervalMs, ownResourceID, 
heartbeatListener, executor, scheduledExecutor, log);
-
-               this.waitLatch = waitLatch;
-       }
-
-       @Override
-       public void unmonitorTarget(ResourceID resourceID) {
-               try {
-                       waitLatch.await();
-               } catch (InterruptedException ex) {
-                       log.error("Unexpected interrupted exception.", ex);
-               }
-
-               super.unmonitorTarget(resourceID);
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/83b99f8a/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/TestingHeartbeatManagerSenderImpl.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/TestingHeartbeatManagerSenderImpl.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/TestingHeartbeatManagerSenderImpl.java
deleted file mode 100644
index 36f7e96..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/TestingHeartbeatManagerSenderImpl.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.heartbeat;
-
-import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.concurrent.ScheduledExecutor;
-import org.slf4j.Logger;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.Executor;
-
-/**
- *
- * @param <I>
- * @param <O>
- */
-public class TestingHeartbeatManagerSenderImpl<I, O> extends 
HeartbeatManagerSenderImpl<I, O> {
-
-       private final CountDownLatch waitLatch;
-
-       public TestingHeartbeatManagerSenderImpl(
-                       CountDownLatch waitLatch,
-                       long heartbeatPeriod,
-                       long heartbeatTimeout,
-                       ResourceID ownResourceID,
-                       HeartbeatListener<I, O> heartbeatListener,
-                       Executor executor,
-                       ScheduledExecutor scheduledExecutor,
-                       Logger log) {
-
-               super(heartbeatPeriod, heartbeatTimeout, ownResourceID, 
heartbeatListener, executor, scheduledExecutor, log);
-
-               this.waitLatch = waitLatch;
-       }
-
-       @Override
-       public void unmonitorTarget(ResourceID resourceID) {
-               try {
-                       waitLatch.await();
-               } catch (InterruptedException ex) {
-                       log.error("Unexpected interrupted exception.", ex);
-               }
-
-               super.unmonitorTarget(resourceID);
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/83b99f8a/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/TestingHeartbeatServices.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/TestingHeartbeatServices.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/TestingHeartbeatServices.java
deleted file mode 100644
index e628db5..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/TestingHeartbeatServices.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.heartbeat;
-
-import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.concurrent.ScheduledExecutor;
-import org.apache.flink.util.Preconditions;
-import org.slf4j.Logger;
-
-public class TestingHeartbeatServices extends HeartbeatServices {
-
-       private final ScheduledExecutor scheduledExecutorToUse;
-
-       public TestingHeartbeatServices(long heartbeatInterval, long 
heartbeatTimeout, ScheduledExecutor scheduledExecutorToUse) {
-               super(heartbeatInterval, heartbeatTimeout);
-
-               this.scheduledExecutorToUse = 
Preconditions.checkNotNull(scheduledExecutorToUse);
-       }
-
-       @Override
-       public <I, O> HeartbeatManager<I, O> createHeartbeatManagerSender(
-               ResourceID resourceId,
-               HeartbeatListener<I, O> heartbeatListener,
-               ScheduledExecutor scheduledExecutor,
-               Logger log) {
-
-               return new HeartbeatManagerSenderImpl<>(
-                       heartbeatInterval,
-                       heartbeatTimeout,
-                       resourceId,
-                       heartbeatListener,
-                       
org.apache.flink.runtime.concurrent.Executors.directExecutor(),
-                       scheduledExecutorToUse,
-                       log);
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/83b99f8a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index 81fc541..080b48e 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -1043,11 +1043,11 @@ public class JobMaster extends 
RpcEndpoint<JobMasterGateway> {
 
                @Override
                public void notifyHeartbeatTimeout(ResourceID resourceID) {
-                       log.info("Task manager with id {} heartbeat timed 
out.", resourceID);
+                       log.info("Heartbeat of TaskManager with id {} timed 
out.", resourceID);
 
                        getSelf().disconnectTaskManager(
                                resourceID,
-                               new TimeoutException("Task manager with id " + 
resourceID + " heartbeat timed out."));
+                               new TimeoutException("Heartbeat of TaskManager 
with id " + resourceID + " timed out."));
                }
 
                @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/83b99f8a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
index 2cfba7b..9d5f9d1 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
@@ -241,7 +241,12 @@ public class MiniCluster {
                                // bring up the ResourceManager(s)
                                LOG.info("Starting {} ResourceManger(s)", 
numResourceManagers);
                                resourceManagerRunners = startResourceManagers(
-                                               configuration, haServices, 
heartbeatServices, metricRegistry, numResourceManagers, 
resourceManagerRpcServices);
+                                       configuration,
+                                       haServices,
+                                       heartbeatServices,
+                                       metricRegistry,
+                                       numResourceManagers,
+                                       resourceManagerRpcServices);
 
                                // bring up the TaskManager(s) for the mini 
cluster
                                LOG.info("Starting {} TaskManger(s)", 
numTaskManagers);

http://git-wip-us.apache.org/repos/asf/flink/blob/83b99f8a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
index 9a7a790..5467177 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
@@ -64,6 +64,7 @@ import org.apache.flink.util.ExceptionUtils;
 import java.io.Serializable;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Objects;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
@@ -129,8 +130,8 @@ public abstract class ResourceManager<WorkerType extends 
Serializable>
        private ConcurrentMap<String, InfoMessageListenerRpcGateway> 
infoMessageListeners;
 
        public ResourceManager(
-                       ResourceID resourceId,
                        RpcService rpcService,
+                       ResourceID resourceId,
                        ResourceManagerConfiguration 
resourceManagerConfiguration,
                        HighAvailabilityServices highAvailabilityServices,
                        HeartbeatServices heartbeatServices,
@@ -359,7 +360,7 @@ public abstract class ResourceManager<WorkerType extends 
Serializable>
                final ResourceID taskExecutorResourceId,
                final SlotReport slotReport) {
 
-               if (leaderSessionId.equals(resourceManagerLeaderId)) {
+               if (Objects.equals(leaderSessionId, resourceManagerLeaderId)) {
                        Future<TaskExecutorGateway> taskExecutorGatewayFuture = 
getRpcService().connect(taskExecutorAddress, TaskExecutorGateway.class);
 
                        return taskExecutorGatewayFuture.handleAsync(new 
BiFunction<TaskExecutorGateway, Throwable, RegistrationResponse>() {
@@ -384,7 +385,8 @@ public abstract class ResourceManager<WorkerType extends 
Serializable>
                                                
taskManagerHeartbeatManager.monitorTarget(taskExecutorResourceId, new 
HeartbeatTarget<Void>() {
                                                        @Override
                                                        public void 
receiveHeartbeat(ResourceID resourceID, Void payload) {
-                                                               // the task 
manager will not request heartbeat, so this method will never be called 
currently
+                                                               // the 
ResourceManager will always send heartbeat requests to the
+                                                               // TaskManager
                                                        }
 
                                                        @Override
@@ -394,7 +396,8 @@ public abstract class ResourceManager<WorkerType extends 
Serializable>
                                                });
 
                                                return new 
TaskExecutorRegistrationSuccess(
-                                                       
registration.getInstanceID(), resourceId,
+                                                       
registration.getInstanceID(),
+                                                       resourceId,
                                                        
resourceManagerConfiguration.getHeartbeatInterval().toMilliseconds());
                                        }
                                }
@@ -607,6 +610,30 @@ public abstract class ResourceManager<WorkerType extends 
Serializable>
        }
 
        /**
+        * This method should be called by the framework once it detects that a 
currently registered
+        * task executor has failed.
+        *
+        * @param resourceID Id of the TaskManager that has failed.
+        * @param cause The exception which cause the TaskManager failed.
+        */
+       protected void closeTaskManagerConnection(final ResourceID resourceID, 
final Exception cause) {
+               taskManagerHeartbeatManager.unmonitorTarget(resourceID);
+
+               WorkerRegistration<WorkerType> workerRegistration = 
taskExecutors.remove(resourceID);
+
+               if (workerRegistration != null) {
+                       log.info("Task manager {} failed because {}.", 
resourceID, cause);
+
+                       // TODO :: suggest failed task executor to stop itself
+                       slotManager.notifyTaskManagerFailure(resourceID);
+
+                       
workerRegistration.getTaskExecutorGateway().disconnectResourceManager(cause);
+               } else {
+                       log.debug("Could not find a registered task manager 
with the process id {}.", resourceID);
+               }
+       }
+
+       /**
         * Checks whether the given resource manager leader id is matching the 
current leader id and
         * not null.
         *
@@ -756,30 +783,6 @@ public abstract class ResourceManager<WorkerType extends 
Serializable>
                onFatalErrorAsync(new ResourceManagerException("Received an 
error from the LeaderElectionService.", exception));
        }
 
-       /**
-        * This method should be called by the framework once it detects that a 
currently registered
-        * task executor has failed.
-        *
-        * @param resourceID Id of the TaskManager that has failed.
-        * @param cause The exception which cause the TaskManager failed.
-        */
-       public void closeTaskManagerConnection(final ResourceID resourceID, 
final Exception cause) {
-               taskManagerHeartbeatManager.unmonitorTarget(resourceID);
-
-               WorkerRegistration<WorkerType> workerRegistration = 
taskExecutors.remove(resourceID);
-
-               if (workerRegistration != null) {
-                       log.info("Task manager {} failed because {}.", 
resourceID, cause);
-
-                       // TODO :: suggest failed task executor to stop itself
-                       slotManager.notifyTaskManagerFailure(resourceID);
-
-                       
workerRegistration.getTaskExecutorGateway().disconnectResourceManager(cause);
-               } else {
-                       log.debug("Could not find a registered task manager 
with the process id {}.", resourceID);
-               }
-       }
-
        // 
------------------------------------------------------------------------
        //  Framework specific behavior
        // 
------------------------------------------------------------------------
@@ -875,11 +878,17 @@ public abstract class ResourceManager<WorkerType extends 
Serializable>
        private class TaskManagerHeartbeatListener implements 
HeartbeatListener<Void, Void> {
 
                @Override
-               public void notifyHeartbeatTimeout(ResourceID resourceID) {
+               public void notifyHeartbeatTimeout(final ResourceID resourceID) 
{
                        log.info("The heartbeat of TaskManager with id {} timed 
out.", resourceID);
 
-                       closeTaskManagerConnection(resourceID, new 
TimeoutException(
-                                       "Task manager with id " + resourceID + 
" heartbeat timed out."));
+                       runAsync(new Runnable() {
+                               @Override
+                               public void run() {
+                                       closeTaskManagerConnection(
+                                               resourceID,
+                                               new TimeoutException("Task 
manager with id " + resourceID + " heartbeat timed out."));
+                               }
+                       });
                }
 
                @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/83b99f8a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
index 7741e0d..cda4a7c 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
@@ -134,13 +134,12 @@ public interface ResourceManagerGateway extends 
RpcGateway {
        /**
         * Sends the heartbeat to resource manager from task manager
         *
-        * @param resourceID unique id of the task manager
+        * @param heartbeatOrigin unique id of the task manager
         */
-       void heartbeatFromTaskManager(final ResourceID resourceID);
+       void heartbeatFromTaskManager(final ResourceID heartbeatOrigin);
 
        /**
-        * Disconnects the given {@link 
org.apache.flink.runtime.taskexecutor.TaskExecutor} from the
-        * {@link ResourceManager}.
+        * Disconnects a TaskManager specified by the given resourceID from the 
{@link ResourceManager}.
         *
         * @param resourceID identifying the TaskManager to disconnect
         * @param cause for the disconnection of the TaskManager

http://git-wip-us.apache.org/repos/asf/flink/blob/83b99f8a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRunner.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRunner.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRunner.java
index d07e373..3a8baa6 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRunner.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRunner.java
@@ -69,8 +69,8 @@ public class ResourceManagerRunner implements 
FatalErrorHandler {
                        rpcService.getScheduledExecutor());
 
                this.resourceManager = new StandaloneResourceManager(
-                       resourceId,
                        rpcService,
+                       resourceId,
                        resourceManagerConfiguration,
                        highAvailabilityServices,
                        heartbeatServices,

http://git-wip-us.apache.org/repos/asf/flink/blob/83b99f8a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java
index e2d6538..fd5a001 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java
@@ -38,8 +38,8 @@ import org.apache.flink.runtime.rpc.RpcService;
 public class StandaloneResourceManager extends ResourceManager<ResourceID> {
 
        public StandaloneResourceManager(
-                       ResourceID resourceId,
                        RpcService rpcService,
+                       ResourceID resourceId,
                        ResourceManagerConfiguration 
resourceManagerConfiguration,
                        HighAvailabilityServices highAvailabilityServices,
                        HeartbeatServices heartbeatServices,
@@ -48,8 +48,8 @@ public class StandaloneResourceManager extends 
ResourceManager<ResourceID> {
                        JobLeaderIdService jobLeaderIdService,
                        FatalErrorHandler fatalErrorHandler) {
                super(
-                       resourceId,
                        rpcService,
+                       resourceId,
                        resourceManagerConfiguration,
                        highAvailabilityServices,
                        heartbeatServices,

http://git-wip-us.apache.org/repos/asf/flink/blob/83b99f8a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
index f3e1ff3..4883e7d 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
@@ -696,17 +696,35 @@ public class TaskExecutor extends 
RpcEndpoint<TaskExecutorGateway> {
                }
        }
 
+       private void establishResourceManagerConnection(ResourceID 
resourceManagerResourceId) {
+               // monitor the resource manager as heartbeat target
+               
resourceManagerHeartbeatManager.monitorTarget(resourceManagerResourceId, new 
HeartbeatTarget<Void>() {
+                       @Override
+                       public void receiveHeartbeat(ResourceID resourceID, 
Void payload) {
+                               ResourceManagerGateway resourceManagerGateway = 
resourceManagerConnection.getTargetGateway();
+                               
resourceManagerGateway.heartbeatFromTaskManager(resourceID);
+                       }
+
+                       @Override
+                       public void requestHeartbeat(ResourceID resourceID, 
Void payload) {
+                               // the TaskManager won't send heartbeat 
requests to the ResourceManager
+                       }
+               });
+       }
+
        private void closeResourceManagerConnection(Exception cause) {
-               log.info("Close ResourceManager connection for {}.", cause);
+               validateRunsInMainThread();
 
                if (isConnectedToResourceManager()) {
+                       log.info("Close ResourceManager connection {}.", 
resourceManagerConnection.getResourceManagerId(), cause);
+
                        
resourceManagerHeartbeatManager.unmonitorTarget(resourceManagerConnection.getResourceManagerId());
 
                        ResourceManagerGateway resourceManagerGateway = 
resourceManagerConnection.getTargetGateway();
+                       
resourceManagerGateway.disconnectTaskManager(getResourceID(), cause);
+
                        resourceManagerConnection.close();
                        resourceManagerConnection = null;
-
-                       
resourceManagerGateway.disconnectTaskManager(getResourceID(), cause);
                }
        }
 
@@ -790,7 +808,7 @@ public class TaskExecutor extends 
RpcEndpoint<TaskExecutorGateway> {
                                                                        "and 
returning them to the ResourceManager.", throwable);
 
                                                        // We encountered an 
exception. Free the slots and return them to the RM.
-                                                       for (SlotOffer 
reservedSlot : reservedSlots) {
+                                                       for (SlotOffer 
reservedSlot: reservedSlots) {
                                                                
freeSlot(reservedSlot.getAllocationId(), throwable);
                                                        }
                                                }
@@ -841,6 +859,8 @@ public class TaskExecutor extends 
RpcEndpoint<TaskExecutorGateway> {
        }
 
        private void closeJobManagerConnection(JobID jobId, Exception cause) {
+               validateRunsInMainThread();
+
                log.info("Close JobManager connection for job {}.", jobId);
 
                // 1. fail tasks running under this JobID
@@ -1183,21 +1203,14 @@ public class TaskExecutor extends 
RpcEndpoint<TaskExecutorGateway> {
                public void 
onRegistrationSuccess(TaskExecutorRegistrationSuccess success) {
                        final ResourceID resourceManagerId = 
success.getResourceManagerId();
 
-                       // monitor the resource manager as heartbeat target
-                       
resourceManagerHeartbeatManager.monitorTarget(resourceManagerId, new 
HeartbeatTarget<Void>() {
-                               @Override
-                               public void receiveHeartbeat(ResourceID 
resourceID, Void payload) {
-                                       if (isConnectedToResourceManager()) {
-                                               ResourceManagerGateway 
resourceManagerGateway = resourceManagerConnection.getTargetGateway();
-                                               
resourceManagerGateway.heartbeatFromTaskManager(resourceID);
+                       runAsync(
+                               new Runnable() {
+                                       @Override
+                                       public void run() {
+                                               
establishResourceManagerConnection(resourceManagerId);
                                        }
                                }
-
-                               @Override
-                               public void requestHeartbeat(ResourceID 
resourceID, Void payload) {
-                                       // request heartbeat will never be 
called on the task manager side
-                               }
-                       });
+                       );
                }
 
                @Override
@@ -1277,14 +1290,15 @@ public class TaskExecutor extends 
RpcEndpoint<TaskExecutorGateway> {
                        runAsync(new Runnable() {
                                @Override
                                public void run() {
-                                       log.info("Job manager with id {} 
heartbeat timed out.", resourceID);
+                                       log.info("The heartbeat of JobManager 
with id {} timed out.", resourceID);
 
                                        if 
(jobManagerConnections.containsKey(resourceID)) {
                                                JobManagerConnection 
jobManagerConnection = jobManagerConnections.get(resourceID);
+
                                                if (jobManagerConnection != 
null) {
                                                        
closeJobManagerConnection(
                                                                
jobManagerConnection.getJobID(),
-                                                               new 
TimeoutException("Job manager with id " + resourceID + " heartbeat timed 
out."));
+                                                               new 
TimeoutException("The heartbeat of JobManager with id " + resourceID + " timed 
out."));
                                                }
                                        }
                                }
@@ -1305,16 +1319,15 @@ public class TaskExecutor extends 
RpcEndpoint<TaskExecutorGateway> {
        private class ResourceManagerHeartbeatListener implements 
HeartbeatListener<Void, Void> {
 
                @Override
-               public void notifyHeartbeatTimeout(final ResourceID resourceID) 
{
+               public void notifyHeartbeatTimeout(final ResourceID resourceId) 
{
                        runAsync(new Runnable() {
                                @Override
                                public void run() {
-                                       log.info("Resource manager with id {} 
heartbeat timed out.", resourceID);
+                                       log.info("The heartbeat of 
ResourceManager with id {} timed out.", resourceId);
 
-                                       if (isConnectedToResourceManager() && 
resourceManagerConnection.getResourceManagerId().equals(resourceID)) {
-                                               closeResourceManagerConnection(
-                                                               new 
TimeoutException("Resource manager with id " + resourceID + " heartbeat timed 
out."));
-                                       }
+                                       closeResourceManagerConnection(
+                                               new TimeoutException(
+                                                       "The heartbeat of 
ResourceManager with id " + resourceId + " timed out."));
                                }
                        });
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/83b99f8a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java
index e7f2439..72925bb 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java
@@ -53,6 +53,7 @@ import 
org.apache.flink.runtime.taskexecutor.TaskExecutorRegistrationSuccess;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.runtime.testutils.TestingResourceManager;
 import org.apache.flink.runtime.util.TestingFatalErrorHandler;
+import org.apache.flink.util.TestLogger;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -76,7 +77,7 @@ import static org.mockito.Mockito.verify;
 /**
  * General tests for the resource manager component.
  */
-public class ResourceManagerTest {
+public class ResourceManagerTest extends TestLogger {
 
        private static ActorSystem system;
 
@@ -393,8 +394,7 @@ public class ResourceManagerTest {
 
                try {
                        final StandaloneResourceManager resourceManager = new 
StandaloneResourceManager(
-                               resourceManagerResourceID,
-                               rpcService,
+                               rpcService, resourceManagerResourceID,
                                resourceManagerConfiguration,
                                highAvailabilityServices,
                                heartbeatServices,

http://git-wip-us.apache.org/repos/asf/flink/blob/83b99f8a/flink-runtime/src/test/java/org/apache/flink/runtime/heartbeat/TestingHeartbeatServices.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/heartbeat/TestingHeartbeatServices.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/heartbeat/TestingHeartbeatServices.java
new file mode 100644
index 0000000..e628db5
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/heartbeat/TestingHeartbeatServices.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.heartbeat;
+
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.concurrent.ScheduledExecutor;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+
+public class TestingHeartbeatServices extends HeartbeatServices {
+
+       private final ScheduledExecutor scheduledExecutorToUse;
+
+       public TestingHeartbeatServices(long heartbeatInterval, long 
heartbeatTimeout, ScheduledExecutor scheduledExecutorToUse) {
+               super(heartbeatInterval, heartbeatTimeout);
+
+               this.scheduledExecutorToUse = 
Preconditions.checkNotNull(scheduledExecutorToUse);
+       }
+
+       @Override
+       public <I, O> HeartbeatManager<I, O> createHeartbeatManagerSender(
+               ResourceID resourceId,
+               HeartbeatListener<I, O> heartbeatListener,
+               ScheduledExecutor scheduledExecutor,
+               Logger log) {
+
+               return new HeartbeatManagerSenderImpl<>(
+                       heartbeatInterval,
+                       heartbeatTimeout,
+                       resourceId,
+                       heartbeatListener,
+                       
org.apache.flink.runtime.concurrent.Executors.directExecutor(),
+                       scheduledExecutorToUse,
+                       log);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/83b99f8a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
index 73da244..ee8f51d 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
@@ -26,7 +26,8 @@ import org.apache.flink.runtime.concurrent.ScheduledExecutor;
 import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
 import 
org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoader;
 import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory;
-import org.apache.flink.runtime.heartbeat.*;
+import org.apache.flink.runtime.heartbeat.HeartbeatServices;
+import org.apache.flink.runtime.heartbeat.TestingHeartbeatServices;
 import 
org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobmanager.OnCompletionActions;
@@ -49,8 +50,11 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
+import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.*;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
 
 @RunWith(PowerMockRunner.class)
 @PrepareForTest(BlobLibraryCacheManager.class)
@@ -139,4 +143,6 @@ public class JobMasterTest extends TestLogger {
                        rpc.stopService();
                }
        }
+
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/83b99f8a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java
index 39594df..c8e209d 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java
@@ -68,8 +68,7 @@ public class ResourceManagerHATest {
 
                final ResourceManager resourceManager =
                        new StandaloneResourceManager(
-                               rmResourceId,
-                               rpcService,
+                               rpcService, rmResourceId,
                                resourceManagerConfiguration,
                                highAvailabilityServices,
                                heartbeatServices,

http://git-wip-us.apache.org/repos/asf/flink/blob/83b99f8a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java
index 0401f9e..32b40ac 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java
@@ -35,6 +35,7 @@ import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.TestingSerialRpcService;
 import org.apache.flink.runtime.registration.RegistrationResponse;
 import org.apache.flink.runtime.util.TestingFatalErrorHandler;
+import org.apache.flink.util.TestLogger;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -45,7 +46,7 @@ import java.util.concurrent.TimeUnit;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.*;
 
-public class ResourceManagerJobMasterTest {
+public class ResourceManagerJobMasterTest extends TestLogger {
 
        private TestingSerialRpcService rpcService;
 
@@ -216,8 +217,7 @@ public class ResourceManagerJobMasterTest {
                        Time.minutes(5L));
 
                ResourceManager resourceManager = new StandaloneResourceManager(
-                       rmResourceId,
-                       rpcService,
+                       rpcService, rmResourceId,
                        resourceManagerConfiguration,
                        highAvailabilityServices,
                        heartbeatServices,

http://git-wip-us.apache.org/repos/asf/flink/blob/83b99f8a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java
index 7c811d9..cb0a414 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java
@@ -32,6 +32,7 @@ import org.apache.flink.runtime.taskexecutor.SlotReport;
 import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
 import org.apache.flink.runtime.taskexecutor.TaskExecutorRegistrationSuccess;
 import org.apache.flink.runtime.util.TestingFatalErrorHandler;
+import org.apache.flink.util.TestLogger;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -43,7 +44,7 @@ import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.mock;
 
-public class ResourceManagerTaskExecutorTest {
+public class ResourceManagerTaskExecutorTest extends TestLogger {
 
        private TestingSerialRpcService rpcService;
 
@@ -148,7 +149,7 @@ public class ResourceManagerTaskExecutorTest {
 
        private StandaloneResourceManager 
createAndStartResourceManager(TestingLeaderElectionService 
rmLeaderElectionService, FatalErrorHandler fatalErrorHandler) throws Exception {
                TestingHighAvailabilityServices highAvailabilityServices = new 
TestingHighAvailabilityServices();
-               HeartbeatServices heartbeatServices = 
mock(HeartbeatServices.class);
+               HeartbeatServices heartbeatServices = new HeartbeatServices(5L, 
5L);
                
highAvailabilityServices.setResourceManagerLeaderElectionService(rmLeaderElectionService);
                TestingSlotManagerFactory slotManagerFactory = new 
TestingSlotManagerFactory();
                ResourceManagerConfiguration resourceManagerConfiguration = new 
ResourceManagerConfiguration(
@@ -163,8 +164,8 @@ public class ResourceManagerTaskExecutorTest {
 
                StandaloneResourceManager resourceManager =
                        new StandaloneResourceManager(
-                               resourceManagerResourceID,
                                rpcService,
+                               resourceManagerResourceID,
                                resourceManagerConfiguration,
                                highAvailabilityServices,
                                heartbeatServices,

http://git-wip-us.apache.org/repos/asf/flink/blob/83b99f8a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
index 28ed697..68aff42 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
@@ -237,8 +237,7 @@ public class SlotProtocolTest extends TestLogger {
 
                ResourceManager<ResourceID> resourceManager =
                        Mockito.spy(new StandaloneResourceManager(
-                               rmResourceId,
-                               testRpcService,
+                               testRpcService, rmResourceId,
                                resourceManagerConfiguration,
                                testingHaServices,
                                heartbeatServices,
@@ -325,8 +324,7 @@ public class SlotProtocolTest extends TestLogger {
                                JobLeaderIdService jobLeaderIdService,
                                FatalErrorHandler fatalErrorHandler) {
                        super(
-                               resourceId,
-                               rpcService,
+                               rpcService, resourceId,
                                resourceManagerConfiguration,
                                highAvailabilityServices,
                                heartbeatServices,

http://git-wip-us.apache.org/repos/asf/flink/blob/83b99f8a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
index 4e76486..1789ace 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
@@ -120,8 +120,7 @@ public class TaskExecutorITCase {
                final JobLeaderService jobLeaderService = new 
JobLeaderService(taskManagerLocation);
 
                ResourceManager<ResourceID> resourceManager = new 
StandaloneResourceManager(
-                       rmResourceId,
-                       rpcService,
+                       rpcService, rmResourceId,
                        resourceManagerConfiguration,
                        testingHAServices,
                        heartbeatServices,

http://git-wip-us.apache.org/repos/asf/flink/blob/83b99f8a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
index d1f6e2e..330d4fb 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
@@ -224,13 +224,19 @@ public class TaskExecutorTest extends TestLogger {
                ResourceManagerGateway rmGateway = 
mock(ResourceManagerGateway.class);
                when(rmGateway.registerTaskExecutor(
                        any(UUID.class), anyString(), any(ResourceID.class), 
any(SlotReport.class), any(Time.class)))
-                       
.thenReturn(FlinkCompletableFuture.<RegistrationResponse>completed(
-                                       new TaskExecutorRegistrationSuccess(new 
InstanceID(), rmResourceId, 10L)));
+                       .thenReturn(
+                               
FlinkCompletableFuture.<RegistrationResponse>completed(
+                                       new TaskExecutorRegistrationSuccess(
+                                               new InstanceID(),
+                                               rmResourceId,
+                                               10L)));
 
                final TestingSerialRpcService rpc = new 
TestingSerialRpcService();
                rpc.registerGateway(rmAddress, rmGateway);
 
-               final TestingLeaderRetrievalService testLeaderService = new 
TestingLeaderRetrievalService();
+               final TestingLeaderRetrievalService testLeaderService = new 
TestingLeaderRetrievalService(
+                       null,
+                       null);
                final TestingHighAvailabilityServices haServices = new 
TestingHighAvailabilityServices();
                haServices.setResourceManagerLeaderRetriever(testLeaderService);
 
@@ -292,11 +298,11 @@ public class TaskExecutorTest extends TestLogger {
                        testLeaderService.notifyListener(rmAddress, rmLeaderId);
 
                        // register resource manager success will trigger 
monitoring heartbeat target between tm and rm
-                       verify(rmGateway).registerTaskExecutor(
+                       verify(rmGateway, atLeast(1)).registerTaskExecutor(
                                        eq(rmLeaderId), 
eq(taskManager.getAddress()), eq(tmResourceId), any(SlotReport.class), 
any(Time.class));
 
                        // heartbeat timeout should trigger disconnect 
TaskManager from ResourceManager
-                       verify(rmGateway, timeout(heartbeatTimeout * 
5)).disconnectTaskManager(eq(taskManagerLocation.getResourceID()), 
any(TimeoutException.class));
+                       verify(rmGateway, timeout(heartbeatTimeout * 
50L)).disconnectTaskManager(eq(taskManagerLocation.getResourceID()), 
any(TimeoutException.class));
 
                        // check if a concurrent error occurred
                        testingFatalErrorHandler.rethrowError();

http://git-wip-us.apache.org/repos/asf/flink/blob/83b99f8a/flink-runtime/src/test/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/resources/log4j-test.properties 
b/flink-runtime/src/test/resources/log4j-test.properties
index 98f136a..7ba1633 100644
--- a/flink-runtime/src/test/resources/log4j-test.properties
+++ b/flink-runtime/src/test/resources/log4j-test.properties
@@ -16,7 +16,7 @@
 # limitations under the License.
 
################################################################################
 
-log4j.rootLogger=INFO, console
+log4j.rootLogger=OFF, console
 
 # -----------------------------------------------------------------------------
 # Console (use 'console')

http://git-wip-us.apache.org/repos/asf/flink/blob/83b99f8a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java
 
b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java
index ed672a3..21e6e45 100644
--- 
a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java
+++ 
b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java
@@ -199,10 +199,9 @@ public class YarnFlinkApplicationMasterRunner extends 
AbstractYarnFlinkApplicati
                        commonRpcService.getScheduledExecutor());
 
                return new YarnResourceManager(
-                       ResourceID.generate(),
+                       commonRpcService, ResourceID.generate(),
                        config,
                        ENV,
-                       commonRpcService,
                        resourceManagerConfiguration,
                        haServices,
                        heartbeatServices,

http://git-wip-us.apache.org/repos/asf/flink/blob/83b99f8a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java 
b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
index a308079..f8cf275 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
@@ -107,10 +107,10 @@ public class YarnResourceManager extends 
ResourceManager<ResourceID> implements
        final private Map<ResourceProfile, Integer> resourcePriorities = new 
HashMap<>();
 
        public YarnResourceManager(
+                       RpcService rpcService,
                        ResourceID resourceId,
                        Configuration flinkConfig,
                        Map<String, String> env,
-                       RpcService rpcService,
                        ResourceManagerConfiguration 
resourceManagerConfiguration,
                        HighAvailabilityServices highAvailabilityServices,
                        HeartbeatServices heartbeatServices,
@@ -119,8 +119,8 @@ public class YarnResourceManager extends 
ResourceManager<ResourceID> implements
                        JobLeaderIdService jobLeaderIdService,
                        FatalErrorHandler fatalErrorHandler) {
                super(
-                       resourceId,
                        rpcService,
+                       resourceId,
                        resourceManagerConfiguration,
                        highAvailabilityServices,
                        heartbeatServices,

Reply via email to