Addressing review comments and another fix * Addressed review comments * Fixed problems with NodeManager restart Credits to Santosh Marella for this If a NodeManager dies, Myriad scheduler will relaunch it. Currently we were ignoring the node added event if we had a record of a previous NM running on that node. Fixed this. Also RM does not immediately detect the loss of a NM. It does so after a timeout interval. After the timeout interval the NM node is removed from the Scheduler. However during the interval between NM dying and RM detecting this, a new NM could be launched on the node. After the timeout, this newly running NM was being removed form the scheduler. Fixed this.
* Tried failing over RM + scheduler while a job was in progress successfully. * Tried killing NM. NM was re-launched successfully and continued running yarn containers (and corresponding Mesos tasks) after restart. Project: http://git-wip-us.apache.org/repos/asf/incubator-myriad/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-myriad/commit/e7c81e4f Tree: http://git-wip-us.apache.org/repos/asf/incubator-myriad/tree/e7c81e4f Diff: http://git-wip-us.apache.org/repos/asf/incubator-myriad/diff/e7c81e4f Branch: refs/heads/master Commit: e7c81e4f690c7f7374dbc866c3c87ec68ad79995 Parents: 9677ef8 Author: Swapnil Daingade <sdaing...@maprtech.com> Authored: Wed Sep 2 10:45:00 2015 -0700 Committer: Swapnil Daingade <sdaing...@maprtech.com> Committed: Wed Sep 2 10:45:00 2015 -0700 ---------------------------------------------------------------------- .../java/com/ebay/myriad/executor/MyriadExecutor.java | 10 ++++------ .../java/com/ebay/myriad/scheduler/MyriadOperations.java | 1 + .../com/ebay/myriad/scheduler/fgs/NMHeartBeatHandler.java | 5 ----- .../myriad/scheduler/fgs/YarnNodeCapacityManager.java | 6 +----- .../main/java/com/ebay/myriad/state/SchedulerState.java | 5 ++--- 5 files changed, 8 insertions(+), 19 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/e7c81e4f/myriad-executor/src/main/java/com/ebay/myriad/executor/MyriadExecutor.java ---------------------------------------------------------------------- diff --git a/myriad-executor/src/main/java/com/ebay/myriad/executor/MyriadExecutor.java b/myriad-executor/src/main/java/com/ebay/myriad/executor/MyriadExecutor.java index 42636f5..4f1e4e9 100644 --- a/myriad-executor/src/main/java/com/ebay/myriad/executor/MyriadExecutor.java +++ b/myriad-executor/src/main/java/com/ebay/myriad/executor/MyriadExecutor.java @@ -39,9 +39,6 @@ public class MyriadExecutor implements Executor { private static final Logger LOGGER = LoggerFactory.getLogger(MyriadExecutor.class); - private static final String YARN_CONTAINER_TASK_ID_PREFIX - = "yarn_"; - private Set<String> containerIds; public MyriadExecutor(Set<String> containerTaskIds) { @@ -79,14 +76,15 @@ public class MyriadExecutor implements Executor { LOGGER.debug("killTask received for taskId: " + taskId.getValue()); TaskStatus status; - if (!taskId.toString().contains(YARN_CONTAINER_TASK_ID_PREFIX)) { + if (!taskId.toString().contains( + MyriadExecutorAuxService.YARN_CONTAINER_TASK_ID_PREFIX)) { // Inform mesos of killing all tasks corresponding to yarn containers that are // currently running synchronized (containerIds) { for (String containerId : containerIds) { Protos.TaskID containerTaskId = Protos.TaskID.newBuilder() - .setValue(YARN_CONTAINER_TASK_ID_PREFIX + containerId) - .build(); + .setValue(MyriadExecutorAuxService.YARN_CONTAINER_TASK_ID_PREFIX + + containerId).build(); status = TaskStatus.newBuilder().setTaskId(containerTaskId) .setState(TaskState.TASK_KILLED) .build(); http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/e7c81e4f/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/MyriadOperations.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/MyriadOperations.java b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/MyriadOperations.java index 8c15bfa..4947647 100644 --- a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/MyriadOperations.java +++ b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/MyriadOperations.java @@ -82,6 +82,7 @@ public class MyriadOperations { for (NodeTask nodeTask : activeTasks) { if (nodeTask.getHostname().equals(nodeToScaleDown)) { nodePresentInMyriad = true; + break; } } if (!nodePresentInMyriad) { http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/e7c81e4f/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/fgs/NMHeartBeatHandler.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/fgs/NMHeartBeatHandler.java b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/fgs/NMHeartBeatHandler.java index 47393a4..ba2c3b7 100644 --- a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/fgs/NMHeartBeatHandler.java +++ b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/fgs/NMHeartBeatHandler.java @@ -89,11 +89,6 @@ public class NMHeartBeatHandler extends BaseInterceptor { } break; - case EXPIRE: { - nodeStore.remove(event.getNodeId().getHost()); - } - break; - case STATUS_UPDATE: { handleStatusUpdate(event, context); } http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/e7c81e4f/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/fgs/YarnNodeCapacityManager.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/fgs/YarnNodeCapacityManager.java b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/fgs/YarnNodeCapacityManager.java index 12bbe73..696e82f 100644 --- a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/fgs/YarnNodeCapacityManager.java +++ b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/fgs/YarnNodeCapacityManager.java @@ -100,10 +100,6 @@ public class YarnNodeCapacityManager extends BaseInterceptor { NodeAddedSchedulerEvent nodeAddedEvent = (NodeAddedSchedulerEvent) event; NodeId nodeId = nodeAddedEvent.getAddedRMNode().getNodeID(); String host = nodeId.getHost(); - if (nodeStore.isPresent(host)) { - LOGGER.warn("Ignoring duplicate node registration. Host: {}", host); - return; - } SchedulerNode node = yarnScheduler.getSchedulerNode(nodeId); nodeStore.add(node); @@ -197,7 +193,7 @@ public class YarnNodeCapacityManager extends BaseInterceptor { public void setNodeCapacity(RMNode rmNode, Resource newCapacity) { rmNode.getTotalCapability().setMemory(newCapacity.getMemory()); rmNode.getTotalCapability().setVirtualCores(newCapacity.getVirtualCores()); - LOGGER.info("Setting capacity for node {} to {}", rmNode.getHostName(), newCapacity); + LOGGER.debug("Setting capacity for node {} to {}", rmNode.getHostName(), newCapacity); // updates the scheduler with the new capacity for the NM. // the event is handled by the scheduler asynchronously rmContext.getDispatcher().getEventHandler().handle( http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/e7c81e4f/myriad-scheduler/src/main/java/com/ebay/myriad/state/SchedulerState.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/com/ebay/myriad/state/SchedulerState.java b/myriad-scheduler/src/main/java/com/ebay/myriad/state/SchedulerState.java index f589056..4b5aff3 100644 --- a/myriad-scheduler/src/main/java/com/ebay/myriad/state/SchedulerState.java +++ b/myriad-scheduler/src/main/java/com/ebay/myriad/state/SchedulerState.java @@ -48,7 +48,6 @@ public class SchedulerState { private Set<Protos.TaskID> activeTasks; private Set<Protos.TaskID> lostTasks; private Set<Protos.TaskID> killableTasks; - //private MyriadState myriadState; private Protos.FrameworkID frameworkId; private MyriadStateStore stateStore; @@ -152,7 +151,7 @@ public class SchedulerState { updateStateStore(); } - public Set<Protos.TaskID> getKillableTasks() { + public synchronized Set<Protos.TaskID> getKillableTasks() { return Collections.unmodifiableSet(this.killableTasks); } @@ -175,7 +174,7 @@ public class SchedulerState { return Collections.unmodifiableSet(this.pendingTasks); } - public Set<Protos.TaskID> getActiveTaskIds() { + public synchronized Set<Protos.TaskID> getActiveTaskIds() { return Collections.unmodifiableSet(this.activeTasks); }