[FLINK-4354] [heartbeat] Add heartbeats between the ResourceManager and TaskExecutor
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/83b99f8a Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/83b99f8a Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/83b99f8a Branch: refs/heads/table-retraction Commit: 83b99f8a624ddf35deb934b4d4358582657998c6 Parents: fd90672 Author: Till Rohrmann <trohrm...@apache.org> Authored: Wed Mar 22 12:03:45 2017 +0100 Committer: Till Rohrmann <trohrm...@apache.org> Committed: Thu Mar 23 13:58:44 2017 +0100 ---------------------------------------------------------------------- .../heartbeat/TestingHeartbeatManagerImpl.java | 63 ----------------- .../TestingHeartbeatManagerSenderImpl.java | 61 ----------------- .../heartbeat/TestingHeartbeatServices.java | 52 -------------- .../flink/runtime/jobmaster/JobMaster.java | 4 +- .../flink/runtime/minicluster/MiniCluster.java | 7 +- .../resourcemanager/ResourceManager.java | 71 +++++++++++--------- .../resourcemanager/ResourceManagerGateway.java | 7 +- .../resourcemanager/ResourceManagerRunner.java | 2 +- .../StandaloneResourceManager.java | 4 +- .../runtime/taskexecutor/TaskExecutor.java | 63 ++++++++++------- .../clusterframework/ResourceManagerTest.java | 6 +- .../heartbeat/TestingHeartbeatServices.java | 52 ++++++++++++++ .../flink/runtime/jobmaster/JobMasterTest.java | 10 ++- .../resourcemanager/ResourceManagerHATest.java | 3 +- .../ResourceManagerJobMasterTest.java | 6 +- .../ResourceManagerTaskExecutorTest.java | 7 +- .../slotmanager/SlotProtocolTest.java | 6 +- .../taskexecutor/TaskExecutorITCase.java | 3 +- .../runtime/taskexecutor/TaskExecutorTest.java | 16 +++-- .../src/test/resources/log4j-test.properties | 2 +- .../yarn/YarnFlinkApplicationMasterRunner.java | 3 +- .../apache/flink/yarn/YarnResourceManager.java | 4 +- 22 files changed, 181 insertions(+), 271 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/83b99f8a/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/TestingHeartbeatManagerImpl.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/TestingHeartbeatManagerImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/TestingHeartbeatManagerImpl.java deleted file mode 100644 index a6e056d..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/TestingHeartbeatManagerImpl.java +++ /dev/null @@ -1,63 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.heartbeat; - -import org.apache.flink.runtime.clusterframework.types.ResourceID; -import org.apache.flink.runtime.concurrent.ScheduledExecutor; -import org.slf4j.Logger; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.Executor; - -/** - * Heartbeat manager implementation which extends {@link HeartbeatManagerImpl} for testing. - * It overrides the {@link #unmonitorTarget(ResourceID)} to wait for some tests complete - * when notify heartbeat timeout. - * - * @param <I> Type of the incoming heartbeat payload - * @param <O> Type of the outgoing heartbeat payload - */ -public class TestingHeartbeatManagerImpl<I, O> extends HeartbeatManagerImpl<I, O> { - - private final CountDownLatch waitLatch; - - public TestingHeartbeatManagerImpl( - CountDownLatch waitLatch, - long heartbeatTimeoutIntervalMs, - ResourceID ownResourceID, - HeartbeatListener<I, O> heartbeatListener, - Executor executor, - ScheduledExecutor scheduledExecutor, - Logger log) { - - super(heartbeatTimeoutIntervalMs, ownResourceID, heartbeatListener, executor, scheduledExecutor, log); - - this.waitLatch = waitLatch; - } - - @Override - public void unmonitorTarget(ResourceID resourceID) { - try { - waitLatch.await(); - } catch (InterruptedException ex) { - log.error("Unexpected interrupted exception.", ex); - } - - super.unmonitorTarget(resourceID); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/83b99f8a/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/TestingHeartbeatManagerSenderImpl.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/TestingHeartbeatManagerSenderImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/TestingHeartbeatManagerSenderImpl.java deleted file mode 100644 index 36f7e96..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/TestingHeartbeatManagerSenderImpl.java +++ /dev/null @@ -1,61 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.heartbeat; - -import org.apache.flink.runtime.clusterframework.types.ResourceID; -import org.apache.flink.runtime.concurrent.ScheduledExecutor; -import org.slf4j.Logger; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.Executor; - -/** - * - * @param <I> - * @param <O> - */ -public class TestingHeartbeatManagerSenderImpl<I, O> extends HeartbeatManagerSenderImpl<I, O> { - - private final CountDownLatch waitLatch; - - public TestingHeartbeatManagerSenderImpl( - CountDownLatch waitLatch, - long heartbeatPeriod, - long heartbeatTimeout, - ResourceID ownResourceID, - HeartbeatListener<I, O> heartbeatListener, - Executor executor, - ScheduledExecutor scheduledExecutor, - Logger log) { - - super(heartbeatPeriod, heartbeatTimeout, ownResourceID, heartbeatListener, executor, scheduledExecutor, log); - - this.waitLatch = waitLatch; - } - - @Override - public void unmonitorTarget(ResourceID resourceID) { - try { - waitLatch.await(); - } catch (InterruptedException ex) { - log.error("Unexpected interrupted exception.", ex); - } - - super.unmonitorTarget(resourceID); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/83b99f8a/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/TestingHeartbeatServices.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/TestingHeartbeatServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/TestingHeartbeatServices.java deleted file mode 100644 index e628db5..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/TestingHeartbeatServices.java +++ /dev/null @@ -1,52 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.heartbeat; - -import org.apache.flink.runtime.clusterframework.types.ResourceID; -import org.apache.flink.runtime.concurrent.ScheduledExecutor; -import org.apache.flink.util.Preconditions; -import org.slf4j.Logger; - -public class TestingHeartbeatServices extends HeartbeatServices { - - private final ScheduledExecutor scheduledExecutorToUse; - - public TestingHeartbeatServices(long heartbeatInterval, long heartbeatTimeout, ScheduledExecutor scheduledExecutorToUse) { - super(heartbeatInterval, heartbeatTimeout); - - this.scheduledExecutorToUse = Preconditions.checkNotNull(scheduledExecutorToUse); - } - - @Override - public <I, O> HeartbeatManager<I, O> createHeartbeatManagerSender( - ResourceID resourceId, - HeartbeatListener<I, O> heartbeatListener, - ScheduledExecutor scheduledExecutor, - Logger log) { - - return new HeartbeatManagerSenderImpl<>( - heartbeatInterval, - heartbeatTimeout, - resourceId, - heartbeatListener, - org.apache.flink.runtime.concurrent.Executors.directExecutor(), - scheduledExecutorToUse, - log); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/83b99f8a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java index 81fc541..080b48e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java @@ -1043,11 +1043,11 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> { @Override public void notifyHeartbeatTimeout(ResourceID resourceID) { - log.info("Task manager with id {} heartbeat timed out.", resourceID); + log.info("Heartbeat of TaskManager with id {} timed out.", resourceID); getSelf().disconnectTaskManager( resourceID, - new TimeoutException("Task manager with id " + resourceID + " heartbeat timed out.")); + new TimeoutException("Heartbeat of TaskManager with id " + resourceID + " timed out.")); } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/83b99f8a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java index 2cfba7b..9d5f9d1 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java @@ -241,7 +241,12 @@ public class MiniCluster { // bring up the ResourceManager(s) LOG.info("Starting {} ResourceManger(s)", numResourceManagers); resourceManagerRunners = startResourceManagers( - configuration, haServices, heartbeatServices, metricRegistry, numResourceManagers, resourceManagerRpcServices); + configuration, + haServices, + heartbeatServices, + metricRegistry, + numResourceManagers, + resourceManagerRpcServices); // bring up the TaskManager(s) for the mini cluster LOG.info("Starting {} TaskManger(s)", numTaskManagers); http://git-wip-us.apache.org/repos/asf/flink/blob/83b99f8a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java index 9a7a790..5467177 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java @@ -64,6 +64,7 @@ import org.apache.flink.util.ExceptionUtils; import java.io.Serializable; import java.util.HashMap; import java.util.Map; +import java.util.Objects; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -129,8 +130,8 @@ public abstract class ResourceManager<WorkerType extends Serializable> private ConcurrentMap<String, InfoMessageListenerRpcGateway> infoMessageListeners; public ResourceManager( - ResourceID resourceId, RpcService rpcService, + ResourceID resourceId, ResourceManagerConfiguration resourceManagerConfiguration, HighAvailabilityServices highAvailabilityServices, HeartbeatServices heartbeatServices, @@ -359,7 +360,7 @@ public abstract class ResourceManager<WorkerType extends Serializable> final ResourceID taskExecutorResourceId, final SlotReport slotReport) { - if (leaderSessionId.equals(resourceManagerLeaderId)) { + if (Objects.equals(leaderSessionId, resourceManagerLeaderId)) { Future<TaskExecutorGateway> taskExecutorGatewayFuture = getRpcService().connect(taskExecutorAddress, TaskExecutorGateway.class); return taskExecutorGatewayFuture.handleAsync(new BiFunction<TaskExecutorGateway, Throwable, RegistrationResponse>() { @@ -384,7 +385,8 @@ public abstract class ResourceManager<WorkerType extends Serializable> taskManagerHeartbeatManager.monitorTarget(taskExecutorResourceId, new HeartbeatTarget<Void>() { @Override public void receiveHeartbeat(ResourceID resourceID, Void payload) { - // the task manager will not request heartbeat, so this method will never be called currently + // the ResourceManager will always send heartbeat requests to the + // TaskManager } @Override @@ -394,7 +396,8 @@ public abstract class ResourceManager<WorkerType extends Serializable> }); return new TaskExecutorRegistrationSuccess( - registration.getInstanceID(), resourceId, + registration.getInstanceID(), + resourceId, resourceManagerConfiguration.getHeartbeatInterval().toMilliseconds()); } } @@ -607,6 +610,30 @@ public abstract class ResourceManager<WorkerType extends Serializable> } /** + * This method should be called by the framework once it detects that a currently registered + * task executor has failed. + * + * @param resourceID Id of the TaskManager that has failed. + * @param cause The exception which cause the TaskManager failed. + */ + protected void closeTaskManagerConnection(final ResourceID resourceID, final Exception cause) { + taskManagerHeartbeatManager.unmonitorTarget(resourceID); + + WorkerRegistration<WorkerType> workerRegistration = taskExecutors.remove(resourceID); + + if (workerRegistration != null) { + log.info("Task manager {} failed because {}.", resourceID, cause); + + // TODO :: suggest failed task executor to stop itself + slotManager.notifyTaskManagerFailure(resourceID); + + workerRegistration.getTaskExecutorGateway().disconnectResourceManager(cause); + } else { + log.debug("Could not find a registered task manager with the process id {}.", resourceID); + } + } + + /** * Checks whether the given resource manager leader id is matching the current leader id and * not null. * @@ -756,30 +783,6 @@ public abstract class ResourceManager<WorkerType extends Serializable> onFatalErrorAsync(new ResourceManagerException("Received an error from the LeaderElectionService.", exception)); } - /** - * This method should be called by the framework once it detects that a currently registered - * task executor has failed. - * - * @param resourceID Id of the TaskManager that has failed. - * @param cause The exception which cause the TaskManager failed. - */ - public void closeTaskManagerConnection(final ResourceID resourceID, final Exception cause) { - taskManagerHeartbeatManager.unmonitorTarget(resourceID); - - WorkerRegistration<WorkerType> workerRegistration = taskExecutors.remove(resourceID); - - if (workerRegistration != null) { - log.info("Task manager {} failed because {}.", resourceID, cause); - - // TODO :: suggest failed task executor to stop itself - slotManager.notifyTaskManagerFailure(resourceID); - - workerRegistration.getTaskExecutorGateway().disconnectResourceManager(cause); - } else { - log.debug("Could not find a registered task manager with the process id {}.", resourceID); - } - } - // ------------------------------------------------------------------------ // Framework specific behavior // ------------------------------------------------------------------------ @@ -875,11 +878,17 @@ public abstract class ResourceManager<WorkerType extends Serializable> private class TaskManagerHeartbeatListener implements HeartbeatListener<Void, Void> { @Override - public void notifyHeartbeatTimeout(ResourceID resourceID) { + public void notifyHeartbeatTimeout(final ResourceID resourceID) { log.info("The heartbeat of TaskManager with id {} timed out.", resourceID); - closeTaskManagerConnection(resourceID, new TimeoutException( - "Task manager with id " + resourceID + " heartbeat timed out.")); + runAsync(new Runnable() { + @Override + public void run() { + closeTaskManagerConnection( + resourceID, + new TimeoutException("Task manager with id " + resourceID + " heartbeat timed out.")); + } + }); } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/83b99f8a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java index 7741e0d..cda4a7c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java @@ -134,13 +134,12 @@ public interface ResourceManagerGateway extends RpcGateway { /** * Sends the heartbeat to resource manager from task manager * - * @param resourceID unique id of the task manager + * @param heartbeatOrigin unique id of the task manager */ - void heartbeatFromTaskManager(final ResourceID resourceID); + void heartbeatFromTaskManager(final ResourceID heartbeatOrigin); /** - * Disconnects the given {@link org.apache.flink.runtime.taskexecutor.TaskExecutor} from the - * {@link ResourceManager}. + * Disconnects a TaskManager specified by the given resourceID from the {@link ResourceManager}. * * @param resourceID identifying the TaskManager to disconnect * @param cause for the disconnection of the TaskManager http://git-wip-us.apache.org/repos/asf/flink/blob/83b99f8a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRunner.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRunner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRunner.java index d07e373..3a8baa6 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRunner.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRunner.java @@ -69,8 +69,8 @@ public class ResourceManagerRunner implements FatalErrorHandler { rpcService.getScheduledExecutor()); this.resourceManager = new StandaloneResourceManager( - resourceId, rpcService, + resourceId, resourceManagerConfiguration, highAvailabilityServices, heartbeatServices, http://git-wip-us.apache.org/repos/asf/flink/blob/83b99f8a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java index e2d6538..fd5a001 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java @@ -38,8 +38,8 @@ import org.apache.flink.runtime.rpc.RpcService; public class StandaloneResourceManager extends ResourceManager<ResourceID> { public StandaloneResourceManager( - ResourceID resourceId, RpcService rpcService, + ResourceID resourceId, ResourceManagerConfiguration resourceManagerConfiguration, HighAvailabilityServices highAvailabilityServices, HeartbeatServices heartbeatServices, @@ -48,8 +48,8 @@ public class StandaloneResourceManager extends ResourceManager<ResourceID> { JobLeaderIdService jobLeaderIdService, FatalErrorHandler fatalErrorHandler) { super( - resourceId, rpcService, + resourceId, resourceManagerConfiguration, highAvailabilityServices, heartbeatServices, http://git-wip-us.apache.org/repos/asf/flink/blob/83b99f8a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java index f3e1ff3..4883e7d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java @@ -696,17 +696,35 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> { } } + private void establishResourceManagerConnection(ResourceID resourceManagerResourceId) { + // monitor the resource manager as heartbeat target + resourceManagerHeartbeatManager.monitorTarget(resourceManagerResourceId, new HeartbeatTarget<Void>() { + @Override + public void receiveHeartbeat(ResourceID resourceID, Void payload) { + ResourceManagerGateway resourceManagerGateway = resourceManagerConnection.getTargetGateway(); + resourceManagerGateway.heartbeatFromTaskManager(resourceID); + } + + @Override + public void requestHeartbeat(ResourceID resourceID, Void payload) { + // the TaskManager won't send heartbeat requests to the ResourceManager + } + }); + } + private void closeResourceManagerConnection(Exception cause) { - log.info("Close ResourceManager connection for {}.", cause); + validateRunsInMainThread(); if (isConnectedToResourceManager()) { + log.info("Close ResourceManager connection {}.", resourceManagerConnection.getResourceManagerId(), cause); + resourceManagerHeartbeatManager.unmonitorTarget(resourceManagerConnection.getResourceManagerId()); ResourceManagerGateway resourceManagerGateway = resourceManagerConnection.getTargetGateway(); + resourceManagerGateway.disconnectTaskManager(getResourceID(), cause); + resourceManagerConnection.close(); resourceManagerConnection = null; - - resourceManagerGateway.disconnectTaskManager(getResourceID(), cause); } } @@ -790,7 +808,7 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> { "and returning them to the ResourceManager.", throwable); // We encountered an exception. Free the slots and return them to the RM. - for (SlotOffer reservedSlot : reservedSlots) { + for (SlotOffer reservedSlot: reservedSlots) { freeSlot(reservedSlot.getAllocationId(), throwable); } } @@ -841,6 +859,8 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> { } private void closeJobManagerConnection(JobID jobId, Exception cause) { + validateRunsInMainThread(); + log.info("Close JobManager connection for job {}.", jobId); // 1. fail tasks running under this JobID @@ -1183,21 +1203,14 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> { public void onRegistrationSuccess(TaskExecutorRegistrationSuccess success) { final ResourceID resourceManagerId = success.getResourceManagerId(); - // monitor the resource manager as heartbeat target - resourceManagerHeartbeatManager.monitorTarget(resourceManagerId, new HeartbeatTarget<Void>() { - @Override - public void receiveHeartbeat(ResourceID resourceID, Void payload) { - if (isConnectedToResourceManager()) { - ResourceManagerGateway resourceManagerGateway = resourceManagerConnection.getTargetGateway(); - resourceManagerGateway.heartbeatFromTaskManager(resourceID); + runAsync( + new Runnable() { + @Override + public void run() { + establishResourceManagerConnection(resourceManagerId); } } - - @Override - public void requestHeartbeat(ResourceID resourceID, Void payload) { - // request heartbeat will never be called on the task manager side - } - }); + ); } @Override @@ -1277,14 +1290,15 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> { runAsync(new Runnable() { @Override public void run() { - log.info("Job manager with id {} heartbeat timed out.", resourceID); + log.info("The heartbeat of JobManager with id {} timed out.", resourceID); if (jobManagerConnections.containsKey(resourceID)) { JobManagerConnection jobManagerConnection = jobManagerConnections.get(resourceID); + if (jobManagerConnection != null) { closeJobManagerConnection( jobManagerConnection.getJobID(), - new TimeoutException("Job manager with id " + resourceID + " heartbeat timed out.")); + new TimeoutException("The heartbeat of JobManager with id " + resourceID + " timed out.")); } } } @@ -1305,16 +1319,15 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> { private class ResourceManagerHeartbeatListener implements HeartbeatListener<Void, Void> { @Override - public void notifyHeartbeatTimeout(final ResourceID resourceID) { + public void notifyHeartbeatTimeout(final ResourceID resourceId) { runAsync(new Runnable() { @Override public void run() { - log.info("Resource manager with id {} heartbeat timed out.", resourceID); + log.info("The heartbeat of ResourceManager with id {} timed out.", resourceId); - if (isConnectedToResourceManager() && resourceManagerConnection.getResourceManagerId().equals(resourceID)) { - closeResourceManagerConnection( - new TimeoutException("Resource manager with id " + resourceID + " heartbeat timed out.")); - } + closeResourceManagerConnection( + new TimeoutException( + "The heartbeat of ResourceManager with id " + resourceId + " timed out.")); } }); } http://git-wip-us.apache.org/repos/asf/flink/blob/83b99f8a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java index e7f2439..72925bb 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java @@ -53,6 +53,7 @@ import org.apache.flink.runtime.taskexecutor.TaskExecutorRegistrationSuccess; import org.apache.flink.runtime.testingUtils.TestingUtils; import org.apache.flink.runtime.testutils.TestingResourceManager; import org.apache.flink.runtime.util.TestingFatalErrorHandler; +import org.apache.flink.util.TestLogger; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; @@ -76,7 +77,7 @@ import static org.mockito.Mockito.verify; /** * General tests for the resource manager component. */ -public class ResourceManagerTest { +public class ResourceManagerTest extends TestLogger { private static ActorSystem system; @@ -393,8 +394,7 @@ public class ResourceManagerTest { try { final StandaloneResourceManager resourceManager = new StandaloneResourceManager( - resourceManagerResourceID, - rpcService, + rpcService, resourceManagerResourceID, resourceManagerConfiguration, highAvailabilityServices, heartbeatServices, http://git-wip-us.apache.org/repos/asf/flink/blob/83b99f8a/flink-runtime/src/test/java/org/apache/flink/runtime/heartbeat/TestingHeartbeatServices.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/heartbeat/TestingHeartbeatServices.java b/flink-runtime/src/test/java/org/apache/flink/runtime/heartbeat/TestingHeartbeatServices.java new file mode 100644 index 0000000..e628db5 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/heartbeat/TestingHeartbeatServices.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.heartbeat; + +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.concurrent.ScheduledExecutor; +import org.apache.flink.util.Preconditions; +import org.slf4j.Logger; + +public class TestingHeartbeatServices extends HeartbeatServices { + + private final ScheduledExecutor scheduledExecutorToUse; + + public TestingHeartbeatServices(long heartbeatInterval, long heartbeatTimeout, ScheduledExecutor scheduledExecutorToUse) { + super(heartbeatInterval, heartbeatTimeout); + + this.scheduledExecutorToUse = Preconditions.checkNotNull(scheduledExecutorToUse); + } + + @Override + public <I, O> HeartbeatManager<I, O> createHeartbeatManagerSender( + ResourceID resourceId, + HeartbeatListener<I, O> heartbeatListener, + ScheduledExecutor scheduledExecutor, + Logger log) { + + return new HeartbeatManagerSenderImpl<>( + heartbeatInterval, + heartbeatTimeout, + resourceId, + heartbeatListener, + org.apache.flink.runtime.concurrent.Executors.directExecutor(), + scheduledExecutorToUse, + log); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/83b99f8a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java index 73da244..ee8f51d 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java @@ -26,7 +26,8 @@ import org.apache.flink.runtime.concurrent.ScheduledExecutor; import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager; import org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoader; import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory; -import org.apache.flink.runtime.heartbeat.*; +import org.apache.flink.runtime.heartbeat.HeartbeatServices; +import org.apache.flink.runtime.heartbeat.TestingHeartbeatServices; import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobmanager.OnCompletionActions; @@ -49,8 +50,11 @@ import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import static org.mockito.Matchers.any; import static org.mockito.Matchers.eq; -import static org.mockito.Mockito.*; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; @RunWith(PowerMockRunner.class) @PrepareForTest(BlobLibraryCacheManager.class) @@ -139,4 +143,6 @@ public class JobMasterTest extends TestLogger { rpc.stopService(); } } + + } http://git-wip-us.apache.org/repos/asf/flink/blob/83b99f8a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java index 39594df..c8e209d 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java @@ -68,8 +68,7 @@ public class ResourceManagerHATest { final ResourceManager resourceManager = new StandaloneResourceManager( - rmResourceId, - rpcService, + rpcService, rmResourceId, resourceManagerConfiguration, highAvailabilityServices, heartbeatServices, http://git-wip-us.apache.org/repos/asf/flink/blob/83b99f8a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java index 0401f9e..32b40ac 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java @@ -35,6 +35,7 @@ import org.apache.flink.runtime.rpc.FatalErrorHandler; import org.apache.flink.runtime.rpc.TestingSerialRpcService; import org.apache.flink.runtime.registration.RegistrationResponse; import org.apache.flink.runtime.util.TestingFatalErrorHandler; +import org.apache.flink.util.TestLogger; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -45,7 +46,7 @@ import java.util.concurrent.TimeUnit; import static org.junit.Assert.assertTrue; import static org.mockito.Mockito.*; -public class ResourceManagerJobMasterTest { +public class ResourceManagerJobMasterTest extends TestLogger { private TestingSerialRpcService rpcService; @@ -216,8 +217,7 @@ public class ResourceManagerJobMasterTest { Time.minutes(5L)); ResourceManager resourceManager = new StandaloneResourceManager( - rmResourceId, - rpcService, + rpcService, rmResourceId, resourceManagerConfiguration, highAvailabilityServices, heartbeatServices, http://git-wip-us.apache.org/repos/asf/flink/blob/83b99f8a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java index 7c811d9..cb0a414 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java @@ -32,6 +32,7 @@ import org.apache.flink.runtime.taskexecutor.SlotReport; import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway; import org.apache.flink.runtime.taskexecutor.TaskExecutorRegistrationSuccess; import org.apache.flink.runtime.util.TestingFatalErrorHandler; +import org.apache.flink.util.TestLogger; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -43,7 +44,7 @@ import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertTrue; import static org.mockito.Mockito.mock; -public class ResourceManagerTaskExecutorTest { +public class ResourceManagerTaskExecutorTest extends TestLogger { private TestingSerialRpcService rpcService; @@ -148,7 +149,7 @@ public class ResourceManagerTaskExecutorTest { private StandaloneResourceManager createAndStartResourceManager(TestingLeaderElectionService rmLeaderElectionService, FatalErrorHandler fatalErrorHandler) throws Exception { TestingHighAvailabilityServices highAvailabilityServices = new TestingHighAvailabilityServices(); - HeartbeatServices heartbeatServices = mock(HeartbeatServices.class); + HeartbeatServices heartbeatServices = new HeartbeatServices(5L, 5L); highAvailabilityServices.setResourceManagerLeaderElectionService(rmLeaderElectionService); TestingSlotManagerFactory slotManagerFactory = new TestingSlotManagerFactory(); ResourceManagerConfiguration resourceManagerConfiguration = new ResourceManagerConfiguration( @@ -163,8 +164,8 @@ public class ResourceManagerTaskExecutorTest { StandaloneResourceManager resourceManager = new StandaloneResourceManager( - resourceManagerResourceID, rpcService, + resourceManagerResourceID, resourceManagerConfiguration, highAvailabilityServices, heartbeatServices, http://git-wip-us.apache.org/repos/asf/flink/blob/83b99f8a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java index 28ed697..68aff42 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java @@ -237,8 +237,7 @@ public class SlotProtocolTest extends TestLogger { ResourceManager<ResourceID> resourceManager = Mockito.spy(new StandaloneResourceManager( - rmResourceId, - testRpcService, + testRpcService, rmResourceId, resourceManagerConfiguration, testingHaServices, heartbeatServices, @@ -325,8 +324,7 @@ public class SlotProtocolTest extends TestLogger { JobLeaderIdService jobLeaderIdService, FatalErrorHandler fatalErrorHandler) { super( - resourceId, - rpcService, + rpcService, resourceId, resourceManagerConfiguration, highAvailabilityServices, heartbeatServices, http://git-wip-us.apache.org/repos/asf/flink/blob/83b99f8a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java index 4e76486..1789ace 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java @@ -120,8 +120,7 @@ public class TaskExecutorITCase { final JobLeaderService jobLeaderService = new JobLeaderService(taskManagerLocation); ResourceManager<ResourceID> resourceManager = new StandaloneResourceManager( - rmResourceId, - rpcService, + rpcService, rmResourceId, resourceManagerConfiguration, testingHAServices, heartbeatServices, http://git-wip-us.apache.org/repos/asf/flink/blob/83b99f8a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java index d1f6e2e..330d4fb 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java @@ -224,13 +224,19 @@ public class TaskExecutorTest extends TestLogger { ResourceManagerGateway rmGateway = mock(ResourceManagerGateway.class); when(rmGateway.registerTaskExecutor( any(UUID.class), anyString(), any(ResourceID.class), any(SlotReport.class), any(Time.class))) - .thenReturn(FlinkCompletableFuture.<RegistrationResponse>completed( - new TaskExecutorRegistrationSuccess(new InstanceID(), rmResourceId, 10L))); + .thenReturn( + FlinkCompletableFuture.<RegistrationResponse>completed( + new TaskExecutorRegistrationSuccess( + new InstanceID(), + rmResourceId, + 10L))); final TestingSerialRpcService rpc = new TestingSerialRpcService(); rpc.registerGateway(rmAddress, rmGateway); - final TestingLeaderRetrievalService testLeaderService = new TestingLeaderRetrievalService(); + final TestingLeaderRetrievalService testLeaderService = new TestingLeaderRetrievalService( + null, + null); final TestingHighAvailabilityServices haServices = new TestingHighAvailabilityServices(); haServices.setResourceManagerLeaderRetriever(testLeaderService); @@ -292,11 +298,11 @@ public class TaskExecutorTest extends TestLogger { testLeaderService.notifyListener(rmAddress, rmLeaderId); // register resource manager success will trigger monitoring heartbeat target between tm and rm - verify(rmGateway).registerTaskExecutor( + verify(rmGateway, atLeast(1)).registerTaskExecutor( eq(rmLeaderId), eq(taskManager.getAddress()), eq(tmResourceId), any(SlotReport.class), any(Time.class)); // heartbeat timeout should trigger disconnect TaskManager from ResourceManager - verify(rmGateway, timeout(heartbeatTimeout * 5)).disconnectTaskManager(eq(taskManagerLocation.getResourceID()), any(TimeoutException.class)); + verify(rmGateway, timeout(heartbeatTimeout * 50L)).disconnectTaskManager(eq(taskManagerLocation.getResourceID()), any(TimeoutException.class)); // check if a concurrent error occurred testingFatalErrorHandler.rethrowError(); http://git-wip-us.apache.org/repos/asf/flink/blob/83b99f8a/flink-runtime/src/test/resources/log4j-test.properties ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/resources/log4j-test.properties b/flink-runtime/src/test/resources/log4j-test.properties index 98f136a..7ba1633 100644 --- a/flink-runtime/src/test/resources/log4j-test.properties +++ b/flink-runtime/src/test/resources/log4j-test.properties @@ -16,7 +16,7 @@ # limitations under the License. ################################################################################ -log4j.rootLogger=INFO, console +log4j.rootLogger=OFF, console # ----------------------------------------------------------------------------- # Console (use 'console') http://git-wip-us.apache.org/repos/asf/flink/blob/83b99f8a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java ---------------------------------------------------------------------- diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java index ed672a3..21e6e45 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java @@ -199,10 +199,9 @@ public class YarnFlinkApplicationMasterRunner extends AbstractYarnFlinkApplicati commonRpcService.getScheduledExecutor()); return new YarnResourceManager( - ResourceID.generate(), + commonRpcService, ResourceID.generate(), config, ENV, - commonRpcService, resourceManagerConfiguration, haServices, heartbeatServices, http://git-wip-us.apache.org/repos/asf/flink/blob/83b99f8a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java ---------------------------------------------------------------------- diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java index a308079..f8cf275 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java @@ -107,10 +107,10 @@ public class YarnResourceManager extends ResourceManager<ResourceID> implements final private Map<ResourceProfile, Integer> resourcePriorities = new HashMap<>(); public YarnResourceManager( + RpcService rpcService, ResourceID resourceId, Configuration flinkConfig, Map<String, String> env, - RpcService rpcService, ResourceManagerConfiguration resourceManagerConfiguration, HighAvailabilityServices highAvailabilityServices, HeartbeatServices heartbeatServices, @@ -119,8 +119,8 @@ public class YarnResourceManager extends ResourceManager<ResourceID> implements JobLeaderIdService jobLeaderIdService, FatalErrorHandler fatalErrorHandler) { super( - resourceId, rpcService, + resourceId, resourceManagerConfiguration, highAvailabilityServices, heartbeatServices,