Repository: flink Updated Branches: refs/heads/master bbac4a6c9 -> 40656c5df
[FLINK-7077] [mesos] Implement task release to support dynamic scaling - SlotManager: fix for idleness tracking (`markIdle` shouldn't reset `idleSince` on every call) - ResourceManager: change `stopWorker` method to use `ResourceID` - ResourceManager: schedule callbacks from `ResourceManagerActions` onto main thread - MesosResourceManager: implement `stopWorker` - MesosResourceManager: fix for message routing from child actors to RM This closes #4560. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/40656c5d Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/40656c5d Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/40656c5d Branch: refs/heads/master Commit: 40656c5dfdab7c0a1a7794dfe3a5f661f6156c6f Parents: bbac4a6 Author: Wright, Eron <eron.wri...@emc.com> Authored: Thu Aug 17 18:22:55 2017 -0700 Committer: Till Rohrmann <trohrm...@apache.org> Committed: Sat Aug 19 17:28:01 2017 +0200 ---------------------------------------------------------------------- .../MesosFlinkResourceManager.java | 2 +- .../clusterframework/MesosResourceManager.java | 35 ++++++++++++++--- .../apache/flink/mesos/scheduler/Tasks.scala | 8 ++-- .../MesosResourceManagerTest.java | 41 +++++++++++++++++--- .../flink/mesos/scheduler/TasksTest.scala | 2 +- .../resourcemanager/ResourceManager.java | 35 +++++++++++++++-- .../StandaloneResourceManager.java | 3 +- .../slotmanager/SlotManager.java | 3 ++ .../slotmanager/TaskManagerRegistration.java | 4 +- .../clusterframework/ResourceManagerTest.java | 2 +- .../TestingLeaderElectionService.java | 37 +++++++++++++----- .../apache/flink/yarn/YarnResourceManager.java | 3 +- 12 files changed, 141 insertions(+), 34 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/40656c5d/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManager.java ---------------------------------------------------------------------- diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManager.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManager.java index 05d7e1f..6335745 100644 --- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManager.java +++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManager.java @@ -192,7 +192,7 @@ public class MesosFlinkResourceManager extends FlinkResourceManager<RegisteredMe protected ActorRef createTaskRouter() { return context().actorOf( - Tasks.createActorProps(Tasks.class, config, schedulerDriver, TaskMonitor.class), + Tasks.createActorProps(Tasks.class, self(), config, schedulerDriver, TaskMonitor.class), "tasks"); } http://git-wip-us.apache.org/repos/asf/flink/blob/40656c5d/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java ---------------------------------------------------------------------- diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java index 445010b..9a2ad42 100644 --- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java +++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java @@ -49,7 +49,6 @@ import org.apache.flink.runtime.clusterframework.types.ResourceProfile; import org.apache.flink.runtime.concurrent.FutureUtils; import org.apache.flink.runtime.heartbeat.HeartbeatServices; import org.apache.flink.runtime.highavailability.HighAvailabilityServices; -import org.apache.flink.runtime.instance.InstanceID; import org.apache.flink.runtime.metrics.MetricRegistry; import org.apache.flink.runtime.resourcemanager.JobLeaderIdService; import org.apache.flink.runtime.resourcemanager.ResourceManager; @@ -197,7 +196,7 @@ public class MesosResourceManager extends ResourceManager<RegisteredMesosWorkerN protected ActorRef createTaskMonitor(SchedulerDriver schedulerDriver) { return actorSystem.actorOf( - Tasks.createActorProps(Tasks.class, flinkConfig, schedulerDriver, TaskMonitor.class), + Tasks.createActorProps(Tasks.class, selfActor, flinkConfig, schedulerDriver, TaskMonitor.class), "tasks"); } @@ -422,8 +421,34 @@ public class MesosResourceManager extends ResourceManager<RegisteredMesosWorkerN } @Override - public void stopWorker(InstanceID instanceId) { - // TODO implement worker release + public void stopWorker(ResourceID resourceID) { + LOG.info("Stopping worker {}.", resourceID); + try { + + if (workersInLaunch.containsKey(resourceID)) { + // update persistent state of worker to Released + MesosWorkerStore.Worker worker = workersInLaunch.remove(resourceID); + worker = worker.releaseWorker(); + workerStore.putWorker(worker); + workersBeingReturned.put(extractResourceID(worker.taskID()), worker); + + taskMonitor.tell(new TaskMonitor.TaskGoalStateUpdated(extractGoalState(worker)), selfActor); + + if (worker.hostname().isDefined()) { + // tell the launch coordinator that the task is being unassigned from the host, for planning purposes + launchCoordinator.tell(new LaunchCoordinator.Unassign(worker.taskID(), worker.hostname().get()), selfActor); + } + } + else if (workersBeingReturned.containsKey(resourceID)) { + LOG.info("Ignoring request to stop worker {} because it is already being stopped.", resourceID); + } + else { + LOG.warn("Unrecognized worker {}.", resourceID); + } + } + catch (Exception e) { + onFatalErrorAsync(new ResourceManagerException("Unable to release a worker.", e)); + } } /** @@ -596,8 +621,6 @@ public class MesosResourceManager extends ResourceManager<RegisteredMesosWorkerN assert(launched != null); LOG.info("Worker {} failed with status: {}, reason: {}, message: {}.", id, status.getState(), status.getReason(), status.getMessage()); - - // TODO : launch a replacement worker? } closeTaskManagerConnection(id, new Exception(status.getMessage())); http://git-wip-us.apache.org/repos/asf/flink/blob/40656c5d/flink-mesos/src/main/scala/org/apache/flink/mesos/scheduler/Tasks.scala ---------------------------------------------------------------------- diff --git a/flink-mesos/src/main/scala/org/apache/flink/mesos/scheduler/Tasks.scala b/flink-mesos/src/main/scala/org/apache/flink/mesos/scheduler/Tasks.scala index 4f49c16..54d1bd2 100644 --- a/flink-mesos/src/main/scala/org/apache/flink/mesos/scheduler/Tasks.scala +++ b/flink-mesos/src/main/scala/org/apache/flink/mesos/scheduler/Tasks.scala @@ -34,6 +34,7 @@ import scala.collection.mutable.{Map => MutableMap} * Routes messages between the scheduler and individual task monitor actors. */ class Tasks( + manager: ActorRef, flinkConfig: Configuration, schedulerDriver: SchedulerDriver, taskMonitorCreator: (ActorRefFactory,TaskGoalState) => ActorRef) extends Actor { @@ -92,11 +93,11 @@ class Tasks( } case msg: Reconcile => - context.parent.forward(msg) + manager.forward(msg) case msg: TaskTerminated => taskMap.remove(msg.taskID) - context.parent.forward(msg) + manager.forward(msg) } private def createTask(task: TaskGoalState): ActorRef = { @@ -113,6 +114,7 @@ object Tasks { */ def createActorProps[T <: Tasks, M <: TaskMonitor]( actorClass: Class[T], + manager: ActorRef, flinkConfig: Configuration, schedulerDriver: SchedulerDriver, taskMonitorClass: Class[M]): Props = { @@ -122,6 +124,6 @@ object Tasks { factory.actorOf(props) } - Props.create(actorClass, flinkConfig, schedulerDriver, taskMonitorCreator) + Props.create(actorClass, manager, flinkConfig, schedulerDriver, taskMonitorCreator) } } http://git-wip-us.apache.org/repos/asf/flink/blob/40656c5d/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java ---------------------------------------------------------------------- diff --git a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java index 4bbcb25..cf0c913 100644 --- a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java +++ b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java @@ -78,6 +78,7 @@ import org.apache.mesos.SchedulerDriver; import org.junit.After; import org.junit.Before; import org.junit.Test; +import org.mockito.Mockito; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; import org.slf4j.Logger; @@ -330,9 +331,9 @@ public class MesosResourceManagerTest extends TestLogger { when(slotManager.registerSlotRequest(any(SlotRequest.class))).thenReturn(true); } - public void grantLeadership() { + public void grantLeadership() throws Exception { rmLeaderSessionId = UUID.randomUUID(); - rmLeaderElectionService.isLeader(rmLeaderSessionId); + rmLeaderElectionService.isLeader(rmLeaderSessionId).get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS); } } @@ -454,7 +455,7 @@ public class MesosResourceManagerTest extends TestLogger { MesosWorkerStore.Worker expected = MesosWorkerStore.Worker.newWorker(taskID, resourceProfile); // drain the probe messages - verify(rmServices.workerStore).putWorker(expected); + verify(rmServices.workerStore, Mockito.timeout(timeout.toMilliseconds())).putWorker(expected); assertThat(resourceManager.workersInNew, hasEntry(extractResourceID(taskID), expected)); resourceManager.taskRouter.expectMsgClass(TaskMonitor.TaskGoalStateUpdated.class); resourceManager.launchCoordinator.expectMsgClass(LaunchCoordinator.Launch.class); @@ -531,7 +532,7 @@ public class MesosResourceManagerTest extends TestLogger { // verify that a new worker was persisted, the internal state was updated, the task router was notified, // and the launch coordinator was asked to launch a task MesosWorkerStore.Worker expected = MesosWorkerStore.Worker.newWorker(task1, resourceProfile1); - verify(rmServices.workerStore).putWorker(expected); + verify(rmServices.workerStore, Mockito.timeout(timeout.toMilliseconds())).putWorker(expected); assertThat(resourceManager.workersInNew, hasEntry(extractResourceID(task1), expected)); resourceManager.taskRouter.expectMsgClass(TaskMonitor.TaskGoalStateUpdated.class); resourceManager.launchCoordinator.expectMsgClass(LaunchCoordinator.Launch.class); @@ -617,7 +618,7 @@ public class MesosResourceManagerTest extends TestLogger { // send registration message CompletableFuture<RegistrationResponse> successfulFuture = resourceManager.registerTaskExecutor(rmServices.rmLeaderSessionId, task1Executor.address, task1Executor.resourceID, slotReport, timeout); - RegistrationResponse response = successfulFuture.get(5, TimeUnit.SECONDS); + RegistrationResponse response = successfulFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS); assertTrue(response instanceof TaskExecutorRegistrationSuccess); // verify the internal state @@ -653,6 +654,36 @@ public class MesosResourceManagerTest extends TestLogger { } /** + * Test planned stop of a launched worker. + */ + @Test + public void testStopWorker() throws Exception { + new Context() {{ + // set the initial persistent state with a launched worker + MesosWorkerStore.Worker worker1launched = MesosWorkerStore.Worker.newWorker(task1).launchWorker(slave1, slave1host); + when(rmServices.workerStore.getFrameworkID()).thenReturn(Option.apply(framework1)); + when(rmServices.workerStore.recoverWorkers()).thenReturn(singletonList(worker1launched)); + startResourceManager(); + + // drain the assign message + resourceManager.launchCoordinator.expectMsgClass(LaunchCoordinator.Assign.class); + + // tell the RM to stop the worker + resourceManager.stopWorker(extractResourceID(task1)); + + // verify that the instance state was updated + MesosWorkerStore.Worker worker1Released = worker1launched.releaseWorker(); + verify(rmServices.workerStore).putWorker(worker1Released); + assertThat(resourceManager.workersInLaunch.entrySet(), empty()); + assertThat(resourceManager.workersBeingReturned, hasEntry(extractResourceID(task1), worker1Released)); + + // verify that the monitor was notified + resourceManager.taskRouter.expectMsgClass(TaskMonitor.TaskGoalStateUpdated.class); + resourceManager.launchCoordinator.expectMsgClass(LaunchCoordinator.Unassign.class); + }}; + } + + /** * Test application shutdown handling. */ @Test http://git-wip-us.apache.org/repos/asf/flink/blob/40656c5d/flink-mesos/src/test/scala/org/apache/flink/mesos/scheduler/TasksTest.scala ---------------------------------------------------------------------- diff --git a/flink-mesos/src/test/scala/org/apache/flink/mesos/scheduler/TasksTest.scala b/flink-mesos/src/test/scala/org/apache/flink/mesos/scheduler/TasksTest.scala index fcf2977..b3d9a5f 100644 --- a/flink-mesos/src/test/scala/org/apache/flink/mesos/scheduler/TasksTest.scala +++ b/flink-mesos/src/test/scala/org/apache/flink/mesos/scheduler/TasksTest.scala @@ -93,7 +93,7 @@ class TasksTest taskActorRef } TestActorRef[Tasks]( - Props(classOf[Tasks], config, schedulerDriver, taskActorCreator), + Props(classOf[Tasks], testActor, config, schedulerDriver, taskActorCreator), testActor, TestFSMUtils.randomName) } http://git-wip-us.apache.org/repos/asf/flink/blob/40656c5d/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java index e8ec0e0..a9a9e50 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java @@ -944,7 +944,12 @@ public abstract class ResourceManager<WorkerType extends Serializable> @VisibleForTesting public abstract void startNewWorker(ResourceProfile resourceProfile); - public abstract void stopWorker(InstanceID instanceId); + /** + * Deallocates a resource. + * + * @param resourceID The resource ID + */ + public abstract void stopWorker(ResourceID resourceID); /** * Callback when a worker was started. @@ -960,12 +965,36 @@ public abstract class ResourceManager<WorkerType extends Serializable> @Override public void releaseResource(InstanceID instanceId) { - stopWorker(instanceId); + runAsync(new Runnable() { + @Override + public void run() { + ResourceID resourceID = null; + + for (Map.Entry<ResourceID, WorkerRegistration<WorkerType>> entry : taskExecutors.entrySet()) { + if (entry.getValue().getInstanceID().equals(instanceId)) { + resourceID = entry.getKey(); + break; + } + } + + if (resourceID != null) { + stopWorker(resourceID); + } + else { + log.warn("Ignoring request to release TaskManager with instance ID {} (not found).", instanceId); + } + } + }); } @Override public void allocateResource(ResourceProfile resourceProfile) throws ResourceManagerException { - startNewWorker(resourceProfile); + runAsync(new Runnable() { + @Override + public void run() { + startNewWorker(resourceProfile); + } + }); } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/40656c5d/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java index a921a29..ac2c745 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java @@ -23,7 +23,6 @@ import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.clusterframework.types.ResourceProfile; import org.apache.flink.runtime.heartbeat.HeartbeatServices; import org.apache.flink.runtime.highavailability.HighAvailabilityServices; -import org.apache.flink.runtime.instance.InstanceID; import org.apache.flink.runtime.metrics.MetricRegistry; import org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException; import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager; @@ -76,7 +75,7 @@ public class StandaloneResourceManager extends ResourceManager<ResourceID> { } @Override - public void stopWorker(InstanceID instanceId) { + public void stopWorker(ResourceID resourceID) { } http://git-wip-us.apache.org/repos/asf/flink/blob/40656c5d/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java index 3bda409..5218286 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java @@ -836,10 +836,13 @@ public class SlotManager implements AutoCloseable { while (taskManagerRegistrationIterator.hasNext()) { TaskManagerRegistration taskManagerRegistration = taskManagerRegistrationIterator.next().getValue(); + LOG.debug("Evaluating TaskManager {} for idleness.", taskManagerRegistration.getInstanceId()); if (anySlotUsed(taskManagerRegistration.getSlots())) { taskManagerRegistration.markUsed(); } else if (currentTime - taskManagerRegistration.getIdleSince() >= taskManagerTimeout.toMilliseconds()) { + LOG.info("Removing idle TaskManager {} from the SlotManager.", taskManagerRegistration.getInstanceId()); + taskManagerRegistrationIterator.remove(); internalUnregisterTaskManager(taskManagerRegistration); http://git-wip-us.apache.org/repos/asf/flink/blob/40656c5d/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskManagerRegistration.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskManagerRegistration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskManagerRegistration.java index 7d3764c..f19f9bf 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskManagerRegistration.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskManagerRegistration.java @@ -68,7 +68,9 @@ public class TaskManagerRegistration { } public void markIdle() { - idleSince = System.currentTimeMillis(); + if (!isIdle()) { + idleSince = System.currentTimeMillis(); + } } public void markUsed() { http://git-wip-us.apache.org/repos/asf/flink/blob/40656c5d/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java index 9ad251b..737cede 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java @@ -629,7 +629,7 @@ public class ResourceManagerTest extends TestLogger { final ResourceManagerGateway rmGateway = resourceManager.getSelfGateway(ResourceManagerGateway.class); - rmLeaderElectionService.isLeader(rmLeaderId); + rmLeaderElectionService.isLeader(rmLeaderId).get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS); // test registration response successful and it will trigger monitor heartbeat target, schedule heartbeat request at interval time CompletableFuture<RegistrationResponse> successfulFuture = rmGateway.registerJobManager( http://git-wip-us.apache.org/repos/asf/flink/blob/40656c5d/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingLeaderElectionService.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingLeaderElectionService.java b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingLeaderElectionService.java index d456083..d951db5 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingLeaderElectionService.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingLeaderElectionService.java @@ -19,6 +19,7 @@ package org.apache.flink.runtime.leaderelection; import java.util.UUID; +import java.util.concurrent.CompletableFuture; /** * Test {@link LeaderElectionService} implementation which directly forwards isLeader and notLeader @@ -28,43 +29,61 @@ public class TestingLeaderElectionService implements LeaderElectionService { private LeaderContender contender; private boolean hasLeadership = false; + private CompletableFuture<UUID> confirmationFuture = null; + + /** + * Gets a future that completes when leadership is confirmed. + * + * <p>Note: the future is created upon calling {@link #isLeader(UUID)}. + */ + public synchronized CompletableFuture<UUID> getConfirmationFuture() { + return confirmationFuture; + } @Override - public void start(LeaderContender contender) throws Exception { + public synchronized void start(LeaderContender contender) throws Exception { this.contender = contender; } @Override - public void stop() throws Exception { + public synchronized void stop() throws Exception { } @Override - public void confirmLeaderSessionID(UUID leaderSessionID) { - + public synchronized void confirmLeaderSessionID(UUID leaderSessionID) { + if (confirmationFuture != null) { + confirmationFuture.complete(leaderSessionID); + } } @Override - public boolean hasLeadership() { + public synchronized boolean hasLeadership() { return hasLeadership; } - public void isLeader(UUID leaderSessionID) { + public synchronized CompletableFuture<UUID> isLeader(UUID leaderSessionID) { + if (confirmationFuture != null) { + confirmationFuture.cancel(false); + } + confirmationFuture = new CompletableFuture<>(); hasLeadership = true; contender.grantLeadership(leaderSessionID); + + return confirmationFuture; } - public void notLeader() { + public synchronized void notLeader() { hasLeadership = false; contender.revokeLeadership(); } - public void reset() { + public synchronized void reset() { contender = null; hasLeadership = false; } - public String getAddress() { + public synchronized String getAddress() { return contender.getAddress(); } } http://git-wip-us.apache.org/repos/asf/flink/blob/40656c5d/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java ---------------------------------------------------------------------- diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java index fb1a1c3..c3398c4 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java @@ -28,7 +28,6 @@ import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.clusterframework.types.ResourceProfile; import org.apache.flink.runtime.heartbeat.HeartbeatServices; import org.apache.flink.runtime.highavailability.HighAvailabilityServices; -import org.apache.flink.runtime.instance.InstanceID; import org.apache.flink.runtime.metrics.MetricRegistry; import org.apache.flink.runtime.resourcemanager.JobLeaderIdService; import org.apache.flink.runtime.resourcemanager.ResourceManager; @@ -228,7 +227,7 @@ public class YarnResourceManager extends ResourceManager<ResourceID> implements } @Override - public void stopWorker(InstanceID instanceId) { + public void stopWorker(ResourceID resourceID) { // TODO: Implement to stop the worker }