Multiple fixes for making Myriad working smoothly * Addressed review comments * If a NMTask is flexed down, we now make sure that all the container tasks running on that NM are flexed down first before the NMTask is stopped. * We send TASK_RUNNING and TASK_FINISHED status updates for tasks launched as yarn containers * We read if HA is enabled or disabled from the yml file. HA is disabled by default. Introduced a new field "haEnabled" into the yml file * We allow for multiple implementations of the state store for HA purposes as long as they implement the MyriadStateStore interface * Flexdown fixes ** If a NM is flexed down, it takes time for the RM to realize the NM is no longer up. However the Myriad Scheduler has already forgotten about this NM. This difference in view between RM and Myriad scheduler was causing less than desired number of nodes to be flexed down * Fixed problem of NMTasks with same executor id launching NMs with differnt command line. Currently the format of the executor id for myriad executor is myriad_executor<Slave_ID>. With the NM and Myriad executor merge, each NMTask will have a different command line as the ports for each NM will be differnt. This causes problems with Mesos. If the same executor id is used to launche differnt commands we get TASK_ERROR with "Task has invalid ExecutorInfo" error message. Changed the myriad executor id to be more dynamic. Its new format is myriad_executor<frameworkId><offerID><SlaveId> * Fixed problem with FGS not working if NMTasks are in pending state Currently if we have pending NMTasks, we try to run them against every offer we receive from mesos. If we cannot run the NMTask against an offer, we decline the offer. However we run only one NM per cluster node, hence if the number of NM's exceeds the number of cluster nodes, NMTasks are bound to remain pending. This causes all mesos offers to be declined. These declined mesos overs were being used for FGS causing TASK_LOST erros with message "Task launched with invalid offers" * We make sure that Executor info used for launching NMTasks is preserved in state store, so it can be used to launch mesos tasks for yarn containers even after a RM+scheduler restart. * Better synchronization for SchedulerState, eliminating race conditions. * Made sure SchedulerState is independednt of MyriadStateStore implementation
* Tested Myriad HA while running long running jobs * Tested flexup/flexdown Project: http://git-wip-us.apache.org/repos/asf/incubator-myriad/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-myriad/commit/9677ef89 Tree: http://git-wip-us.apache.org/repos/asf/incubator-myriad/tree/9677ef89 Diff: http://git-wip-us.apache.org/repos/asf/incubator-myriad/diff/9677ef89 Branch: refs/heads/phase1 Commit: 9677ef896b25a4d317a6e2f1c3f18dfa3fdf359c Parents: 0fa49c2 Author: Swapnil Daingade <sdaing...@maprtech.com> Authored: Sat Aug 29 09:43:15 2015 -0700 Committer: Swapnil Daingade <sdaing...@maprtech.com> Committed: Sat Aug 29 11:41:33 2015 -0700 ---------------------------------------------------------------------- .../ebay/myriad/executor/MyriadExecutor.java | 120 ++++++++++++------- .../executor/MyriadExecutorAuxService.java | 27 ++++- .../src/main/java/com/ebay/myriad/Main.java | 4 +- .../main/java/com/ebay/myriad/MyriadModule.java | 16 ++- .../configuration/MyriadConfiguration.java | 12 ++ .../scheduler/DownloadNMExecutorCLGenImpl.java | 6 +- .../ebay/myriad/scheduler/MyriadOperations.java | 17 +++ .../myriad/scheduler/NMExecutorCLGenImpl.java | 8 +- .../com/ebay/myriad/scheduler/TaskFactory.java | 17 +-- .../handlers/ResourceOffersEventHandler.java | 35 ++---- .../com/ebay/myriad/state/MyriadStateStore.java | 6 +- .../com/ebay/myriad/state/SchedulerState.java | 100 ++++++++-------- .../recovery/MyriadFileSystemRMStateStore.java | 23 ++-- .../main/resources/myriad-config-default.yml | 1 + 14 files changed, 236 insertions(+), 156 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/9677ef89/myriad-executor/src/main/java/com/ebay/myriad/executor/MyriadExecutor.java ---------------------------------------------------------------------- diff --git a/myriad-executor/src/main/java/com/ebay/myriad/executor/MyriadExecutor.java b/myriad-executor/src/main/java/com/ebay/myriad/executor/MyriadExecutor.java index 633777b..42636f5 100644 --- a/myriad-executor/src/main/java/com/ebay/myriad/executor/MyriadExecutor.java +++ b/myriad-executor/src/main/java/com/ebay/myriad/executor/MyriadExecutor.java @@ -17,6 +17,7 @@ package com.ebay.myriad.executor; import org.apache.mesos.Executor; import org.apache.mesos.ExecutorDriver; +import org.apache.mesos.Protos; import org.apache.mesos.Protos.ExecutorInfo; import org.apache.mesos.Protos.FrameworkInfo; import org.apache.mesos.Protos.SlaveInfo; @@ -29,6 +30,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.nio.charset.Charset; +import java.util.Set; /** * Myriad's Executor @@ -37,58 +39,88 @@ public class MyriadExecutor implements Executor { private static final Logger LOGGER = LoggerFactory.getLogger(MyriadExecutor.class); - @Override - public void registered(ExecutorDriver driver, - ExecutorInfo executorInfo, - FrameworkInfo frameworkInfo, - SlaveInfo slaveInfo) { - LOGGER.debug("Registered ", executorInfo, " for framework ", frameworkInfo, " on mesos slave ", slaveInfo); - } + private static final String YARN_CONTAINER_TASK_ID_PREFIX + = "yarn_"; - @Override - public void reregistered(ExecutorDriver driver, SlaveInfo slaveInfo) { - LOGGER.debug("ReRegistered"); - } + private Set<String> containerIds; - @Override - public void disconnected(ExecutorDriver driver) { - LOGGER.info("Disconnected"); - } + public MyriadExecutor(Set<String> containerTaskIds) { + this.containerIds = containerTaskIds; + } + + @Override + public void registered(ExecutorDriver driver, ExecutorInfo executorInfo, + FrameworkInfo frameworkInfo, SlaveInfo slaveInfo) { + LOGGER.debug("Registered ", executorInfo, " for framework ", frameworkInfo, " on mesos slave ", slaveInfo); + } + + @Override + public void reregistered(ExecutorDriver driver, SlaveInfo slaveInfo) { + LOGGER.debug("ReRegistered"); + } + + @Override + public void disconnected(ExecutorDriver driver) { + LOGGER.info("Disconnected"); + } + + @Override + public void launchTask(final ExecutorDriver driver, final TaskInfo task) { + LOGGER.debug("launchTask received for taskId: " + task.getTaskId()); + TaskStatus status = TaskStatus.newBuilder() + .setTaskId(task.getTaskId()) + .setState(TaskState.TASK_RUNNING) + .build(); + driver.sendStatusUpdate(status); + } + + @Override + public void killTask(ExecutorDriver driver, TaskID taskId) { + LOGGER.debug("killTask received for taskId: " + taskId.getValue()); + TaskStatus status; - @Override - public void launchTask(final ExecutorDriver driver, final TaskInfo task) { - TaskStatus status = TaskStatus.newBuilder() - .setTaskId(task.getTaskId()) - .setState(TaskState.TASK_RUNNING) + if (!taskId.toString().contains(YARN_CONTAINER_TASK_ID_PREFIX)) { + // Inform mesos of killing all tasks corresponding to yarn containers that are + // currently running + synchronized (containerIds) { + for (String containerId : containerIds) { + Protos.TaskID containerTaskId = Protos.TaskID.newBuilder() + .setValue(YARN_CONTAINER_TASK_ID_PREFIX + containerId) + .build(); + status = TaskStatus.newBuilder().setTaskId(containerTaskId) + .setState(TaskState.TASK_KILLED) + .build(); + driver.sendStatusUpdate(status); + } + } + + // Now kill the node manager task + status = TaskStatus.newBuilder() + .setTaskId(taskId) + .setState(TaskState.TASK_KILLED) .build(); driver.sendStatusUpdate(status); - } + LOGGER.info("NodeManager shutdown after receiving" + + " KillTask for taskId " + taskId.getValue()); + Runtime.getRuntime().exit(0); - @Override - public void killTask(ExecutorDriver driver, TaskID taskId) { - LOGGER.debug("KillTask received for taskId: " + taskId.getValue()); - - TaskStatus status = TaskStatus.newBuilder() - .setTaskId(taskId) - .setState(TaskState.TASK_KILLED) - .build(); - driver.sendStatusUpdate(status); - throw new RuntimeException("NodeManager shutdown after receiving" + - " KillTask for taskId " + taskId.getValue()); + } else { + LOGGER.debug("Cannot delete tasks corresponding to yarn container " + taskId); } + } - @Override - public void frameworkMessage(ExecutorDriver driver, byte[] data) { - LOGGER.info("Framework message received: ", new String(data, Charset.defaultCharset())); - } + @Override + public void frameworkMessage(ExecutorDriver driver, byte[] data) { + LOGGER.info("Framework message received: ", new String(data, Charset.defaultCharset())); + } - @Override - public void shutdown(ExecutorDriver driver) { - LOGGER.debug("Shutdown"); - } + @Override + public void shutdown(ExecutorDriver driver) { + LOGGER.debug("Shutdown"); + } - @Override - public void error(ExecutorDriver driver, String message) { - LOGGER.error("Error message: " + message); - } + @Override + public void error(ExecutorDriver driver, String message) { + LOGGER.error("Error message: " + message); + } } http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/9677ef89/myriad-executor/src/main/java/com/ebay/myriad/executor/MyriadExecutorAuxService.java ---------------------------------------------------------------------- diff --git a/myriad-executor/src/main/java/com/ebay/myriad/executor/MyriadExecutorAuxService.java b/myriad-executor/src/main/java/com/ebay/myriad/executor/MyriadExecutorAuxService.java index a6d126a..54b7ac9 100644 --- a/myriad-executor/src/main/java/com/ebay/myriad/executor/MyriadExecutorAuxService.java +++ b/myriad-executor/src/main/java/com/ebay/myriad/executor/MyriadExecutorAuxService.java @@ -19,11 +19,14 @@ package com.ebay.myriad.executor; import java.nio.ByteBuffer; +import java.util.HashSet; +import java.util.Set; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.server.api.ApplicationInitializationContext; import org.apache.hadoop.yarn.server.api.ApplicationTerminationContext; import org.apache.hadoop.yarn.server.api.AuxiliaryService; +import org.apache.hadoop.yarn.server.api.ContainerInitializationContext; import org.apache.hadoop.yarn.server.api.ContainerTerminationContext; import org.apache.mesos.MesosExecutorDriver; @@ -45,6 +48,10 @@ public class MyriadExecutorAuxService extends AuxiliaryService { public static final String YARN_CONTAINER_TASK_ID_PREFIX = "yarn_"; private MesosExecutorDriver driver; + private Thread myriadExecutorThread; + // Storing container id strings as it is difficult to get access to + // NodeManager's NMContext object from an auxiliary service. + private Set<String> containerIds = new HashSet<>(); protected MyriadExecutorAuxService() { super(SERVICE_NAME); @@ -54,13 +61,14 @@ public class MyriadExecutorAuxService extends AuxiliaryService { protected void serviceStart() throws Exception { LOGGER.info("Starting MyriadExecutor..."); - new Thread(new Runnable() { + myriadExecutorThread = new Thread(new Runnable() { public void run() { - driver = new MesosExecutorDriver(new MyriadExecutor()); + driver = new MesosExecutorDriver(new MyriadExecutor(containerIds)); LOGGER.error("MyriadExecutor exit with status " + Integer.toString(driver.run() == Status.DRIVER_STOPPED ? 0 : 1)); } - }).start(); + }); + myriadExecutorThread.start(); } @Override @@ -81,7 +89,20 @@ public class MyriadExecutorAuxService extends AuxiliaryService { } @Override + public void initializeContainer(ContainerInitializationContext initContainerContext) { + ContainerId containerId = initContainerContext.getContainerId(); + synchronized (containerIds) { + containerIds.add(containerId.toString()); + } + sendStatus(containerId, TaskState.TASK_RUNNING); + } + + @Override public void stopContainer(ContainerTerminationContext stopContainerContext) { + ContainerId containerId = stopContainerContext.getContainerId(); + synchronized (containerIds) { + containerIds.remove(containerId.toString()); + } sendStatus(stopContainerContext.getContainerId(), TaskState.TASK_FINISHED); } http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/9677ef89/myriad-scheduler/src/main/java/com/ebay/myriad/Main.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/com/ebay/myriad/Main.java b/myriad-scheduler/src/main/java/com/ebay/myriad/Main.java index 9871d58..8cf2c58 100644 --- a/myriad-scheduler/src/main/java/com/ebay/myriad/Main.java +++ b/myriad-scheduler/src/main/java/com/ebay/myriad/Main.java @@ -95,13 +95,13 @@ public class Main { initWebApp(injector); initHealthChecks(injector); initProfiles(injector); - validateNMInstances(injector); + //validateNMInstances(injector); initDisruptors(injector); initRebalancerService(cfg, injector); initTerminatorService(injector); startMesosDriver(injector); - startNMInstances(injector); + //startNMInstances(injector); } private void startMesosDriver(Injector injector) { http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/9677ef89/myriad-scheduler/src/main/java/com/ebay/myriad/MyriadModule.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/com/ebay/myriad/MyriadModule.java b/myriad-scheduler/src/main/java/com/ebay/myriad/MyriadModule.java index b73754e..4b67361 100644 --- a/myriad-scheduler/src/main/java/com/ebay/myriad/MyriadModule.java +++ b/myriad-scheduler/src/main/java/com/ebay/myriad/MyriadModule.java @@ -29,6 +29,7 @@ import com.ebay.myriad.scheduler.TaskFactory; import com.ebay.myriad.scheduler.TaskFactory.NMTaskFactoryImpl; import com.ebay.myriad.scheduler.fgs.YarnNodeCapacityManager; import com.ebay.myriad.scheduler.yarn.interceptor.InterceptorRegistry; +import com.ebay.myriad.state.MyriadStateStore; import com.ebay.myriad.state.SchedulerState; import com.ebay.myriad.webapp.HttpConnectorProvider; import com.google.inject.AbstractModule; @@ -94,6 +95,19 @@ public class MyriadModule extends AbstractModule { @Singleton SchedulerState providesSchedulerState(MyriadConfiguration cfg) { LOGGER.debug("Configuring SchedulerState provider"); - return new SchedulerState(rmContext.getStateStore()); + MyriadStateStore myriadStateStore = null; + if (cfg.isHAEnabled()) { + myriadStateStore = providesMyriadStateStore(); + } + return new SchedulerState(myriadStateStore); + } + + private MyriadStateStore providesMyriadStateStore() { + // TODO (sdaingade) Read the implementation class from yml + // once multiple implementations are available. + if (rmContext.getStateStore() instanceof MyriadStateStore) { + return (MyriadStateStore) rmContext.getStateStore(); + } + return null; } } http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/9677ef89/myriad-scheduler/src/main/java/com/ebay/myriad/configuration/MyriadConfiguration.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/com/ebay/myriad/configuration/MyriadConfiguration.java b/myriad-scheduler/src/main/java/com/ebay/myriad/configuration/MyriadConfiguration.java index ae0361e..58b407b 100644 --- a/myriad-scheduler/src/main/java/com/ebay/myriad/configuration/MyriadConfiguration.java +++ b/myriad-scheduler/src/main/java/com/ebay/myriad/configuration/MyriadConfiguration.java @@ -65,6 +65,11 @@ public class MyriadConfiguration { public static final Boolean DEFAULT_REBALANCER = true; /** + * By default ha is turned off. + */ + public static final Boolean DEFAULT_HA_ENABLED = false; + + /** * By default framework failover timeout is 1 day. */ public static final Double DEFAULT_FRAMEWORK_FAILOVER_TIMEOUT_MS = 86400000.0; @@ -111,6 +116,9 @@ public class MyriadConfiguration { private Boolean rebalancer; @JsonProperty + private Boolean haEnabled; + + @JsonProperty private NodeManagerConfiguration nodemanager; @JsonProperty @@ -187,6 +195,10 @@ public class MyriadConfiguration { return rebalancer != null ? rebalancer : DEFAULT_REBALANCER; } + public Boolean isHAEnabled() { + return haEnabled != null ? haEnabled : DEFAULT_HA_ENABLED; + } + public NodeManagerConfiguration getNodeManagerConfiguration() { return this.nodemanager; } http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/9677ef89/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/DownloadNMExecutorCLGenImpl.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/DownloadNMExecutorCLGenImpl.java b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/DownloadNMExecutorCLGenImpl.java index c70d96d..c300c49 100644 --- a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/DownloadNMExecutorCLGenImpl.java +++ b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/DownloadNMExecutorCLGenImpl.java @@ -44,7 +44,7 @@ public class DownloadNMExecutorCLGenImpl extends NMExecutorCLGenImpl { @Override public String generateCommandLine() { - StringBuffer cmdLine = new StringBuffer(); + StringBuilder cmdLine = new StringBuilder(); LOGGER.info("Using remote distribution"); generateEnvironment(); @@ -57,7 +57,7 @@ public class DownloadNMExecutorCLGenImpl extends NMExecutorCLGenImpl { return cmdLine.toString(); } - private void appendNMExtractionCommands(StringBuffer cmdLine) { + private void appendNMExtractionCommands(StringBuilder cmdLine) { /* TODO(darinj): Overall this is messier than I'd like. We can't let mesos untar the distribution, since it will change the permissions. Instead we simply download the tarball and execute tar -xvpf. We also @@ -82,7 +82,7 @@ public class DownloadNMExecutorCLGenImpl extends NMExecutorCLGenImpl { .append("/etc/hadoop/yarn-site.xml;"); } - private void appendUser(StringBuffer cmdLine) { + private void appendUser(StringBuilder cmdLine) { cmdLine.append(" sudo -E -u ").append(cfg.getFrameworkUser().get()).append(" -H"); } http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/9677ef89/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/MyriadOperations.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/MyriadOperations.java b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/MyriadOperations.java index ab466cf..8c15bfa 100644 --- a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/MyriadOperations.java +++ b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/MyriadOperations.java @@ -27,6 +27,7 @@ import org.slf4j.LoggerFactory; import java.util.Collection; import java.util.HashSet; +import java.util.Iterator; import java.util.List; import java.util.Set; @@ -71,6 +72,22 @@ public class MyriadOperations { LOGGER.info("Will skip flexing down {} Node Manager instances that were launched but " + "have not yet registered with Resource Manager.", activeTasks.size() - nodesToScaleDown.size()); } + + // If a NM is flexed down it takes time for the RM to realize the NM is no longer up + // We need to make sure we filter out nodes that have already been flexed down + // but have not disappeared from the RM's view of the cluster + for (Iterator<String> iterator = nodesToScaleDown.iterator(); iterator.hasNext();) { + String nodeToScaleDown = iterator.next(); + boolean nodePresentInMyriad = false; + for (NodeTask nodeTask : activeTasks) { + if (nodeTask.getHostname().equals(nodeToScaleDown)) { + nodePresentInMyriad = true; + } + } + if (!nodePresentInMyriad) { + iterator.remove(); + } + } // TODO(Santosh): Make this more efficient by using a Map<HostName, NodeTask> in scheduler state for (int i = 0; i < numInstancesToScaleDown; i++) { http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/9677ef89/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/NMExecutorCLGenImpl.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/NMExecutorCLGenImpl.java b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/NMExecutorCLGenImpl.java index 572c359..10b9b5b 100644 --- a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/NMExecutorCLGenImpl.java +++ b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/NMExecutorCLGenImpl.java @@ -105,7 +105,7 @@ public class NMExecutorCLGenImpl implements ExecutorCommandLineGenerator { @Override public String generateCommandLine() { - StringBuffer cmdLine = new StringBuffer(); + StringBuilder cmdLine = new StringBuilder(); generateEnvironment(); appendCgroupsCmds(cmdLine); @@ -162,7 +162,7 @@ public class NMExecutorCLGenImpl implements ExecutorCommandLineGenerator { ALL_LOCAL_IPV4ADDR + Long.valueOf(ports.getShufflePort()).toString()); } - protected void appendEnvForNM(StringBuffer cmdLine) { + protected void appendEnvForNM(StringBuilder cmdLine) { cmdLine.append(" env "); for (Map.Entry<String, String> env : environment.entrySet()) { cmdLine.append(env.getKey()).append("=").append("\"") @@ -170,14 +170,14 @@ public class NMExecutorCLGenImpl implements ExecutorCommandLineGenerator { } } - protected void appendCgroupsCmds(StringBuffer cmdLine) { + protected void appendCgroupsCmds(StringBuilder cmdLine) { if (cfg.getNodeManagerConfiguration().getCgroups().or(Boolean.FALSE)) { cmdLine.append(" export TASK_DIR=`basename $PWD`;"); cmdLine.append(" chmod +x /sys/fs/cgroup/cpu/mesos/$TASK_DIR;"); } } - protected void appendYarnHomeExport(StringBuffer cmdLine) { + protected void appendYarnHomeExport(StringBuilder cmdLine) { if (environment.containsKey("YARN_HOME")) { cmdLine.append(" export YARN_HOME=" + environment.get("YARN_HOME") + ";"); } http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/9677ef89/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/TaskFactory.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/TaskFactory.java b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/TaskFactory.java index 4c51c93..a3077fb 100644 --- a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/TaskFactory.java +++ b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/TaskFactory.java @@ -9,11 +9,11 @@ import org.apache.mesos.Protos.CommandInfo; import org.apache.mesos.Protos.CommandInfo.URI; import org.apache.mesos.Protos.ExecutorID; import org.apache.mesos.Protos.ExecutorInfo; +import org.apache.mesos.Protos.FrameworkID; import org.apache.mesos.Protos.Offer; import org.apache.mesos.Protos.Resource; import org.apache.mesos.Protos.TaskInfo; import org.apache.mesos.Protos.TaskID; -import org.apache.mesos.Protos.SlaveID; import org.apache.mesos.Protos.Value; import org.apache.mesos.Protos.Value.Scalar; import org.slf4j.Logger; @@ -28,14 +28,16 @@ import java.util.Objects; * Creates Tasks based on mesos offers */ public interface TaskFactory { - TaskInfo createTask(Offer offer, TaskID taskId, NodeTask nodeTask); + TaskInfo createTask(Offer offer, FrameworkID frameworkId, + TaskID taskId, NodeTask nodeTask); // TODO(Santosh): This is needed because the ExecutorInfo constructed // to launch NM needs to be specified to launch placeholder tasks for // yarn containers (for fine grained scaling). // If mesos supports just specifying the 'ExecutorId' without the full // ExecutorInfo, we wouldn't need this interface method. - ExecutorInfo getExecutorInfoForSlave(SlaveID slave, CommandInfo commandInfo); + ExecutorInfo getExecutorInfoForSlave(FrameworkID frameworkId, + Offer offer, CommandInfo commandInfo); /** * Creates TaskInfo objects to launch NMs as mesos tasks. @@ -150,7 +152,7 @@ public interface TaskFactory { } @Override - public TaskInfo createTask(Offer offer, TaskID taskId, NodeTask nodeTask) { + public TaskInfo createTask(Offer offer, FrameworkID frameworkId, TaskID taskId, NodeTask nodeTask) { Objects.requireNonNull(offer, "Offer should be non-null"); Objects.requireNonNull(nodeTask, "NodeTask should be non-null"); @@ -166,7 +168,7 @@ public interface TaskFactory { .build(); CommandInfo commandInfo = getCommandInfo(profile, ports); - ExecutorInfo executorInfo = getExecutorInfoForSlave(offer.getSlaveId(), commandInfo); + ExecutorInfo executorInfo = getExecutorInfoForSlave(frameworkId, offer, commandInfo); TaskInfo.Builder taskBuilder = TaskInfo.newBuilder() .setName("task-" + taskId.getValue()) @@ -208,7 +210,7 @@ public interface TaskFactory { } @Override - public ExecutorInfo getExecutorInfoForSlave(SlaveID slave, + public ExecutorInfo getExecutorInfoForSlave(FrameworkID frameworkId, Offer offer, CommandInfo commandInfo) { Scalar executorMemory = Scalar.newBuilder() .setValue(taskUtils.getExecutorMemory()).build(); @@ -216,7 +218,8 @@ public interface TaskFactory { .setValue(taskUtils.getExecutorCpus()).build(); ExecutorID executorId = ExecutorID.newBuilder() - .setValue(EXECUTOR_PREFIX + slave.getValue()) + .setValue(EXECUTOR_PREFIX + frameworkId.getValue() + + offer.getId().getValue() + offer.getSlaveId().getValue()) .build(); return ExecutorInfo .newBuilder() http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/9677ef89/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/event/handlers/ResourceOffersEventHandler.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/event/handlers/ResourceOffersEventHandler.java b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/event/handlers/ResourceOffersEventHandler.java index 915bd2f..07241d2 100644 --- a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/event/handlers/ResourceOffersEventHandler.java +++ b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/event/handlers/ResourceOffersEventHandler.java @@ -78,12 +78,10 @@ public class ResourceOffersEventHandler implements EventHandler<ResourceOffersEv LOGGER.debug("Pending tasks: {}", this.schedulerState.getPendingTaskIds()); driverOperationLock.lock(); try { - Set<Protos.TaskID> pendingTasks = schedulerState.getPendingTaskIds(); - if (CollectionUtils.isNotEmpty(pendingTasks)) { - for (Iterator<Offer> iterator = offers.iterator(); iterator.hasNext();) { - Offer offer = iterator.next(); - boolean offerMatch = false; - Protos.TaskID launchedTaskId = null; + for (Iterator<Offer> iterator = offers.iterator(); iterator.hasNext();) { + Offer offer = iterator.next(); + Set<Protos.TaskID> pendingTasks = schedulerState.getPendingTaskIds(); + if (CollectionUtils.isNotEmpty(pendingTasks)) { for (Protos.TaskID pendingTaskId : pendingTasks) { NodeTask taskToLaunch = schedulerState .getTask(pendingTaskId); @@ -92,7 +90,7 @@ public class ResourceOffersEventHandler implements EventHandler<ResourceOffersEv if (matches(offer, profile) && SchedulerUtils.isUniqueHostname(offer, schedulerState.getActiveTasks())) { - TaskInfo task = taskFactory.createTask(offer, pendingTaskId, + TaskInfo task = taskFactory.createTask(offer, schedulerState.getFrameworkID(), pendingTaskId, taskToLaunch); List<OfferID> offerIds = new ArrayList<>(); @@ -101,36 +99,21 @@ public class ResourceOffersEventHandler implements EventHandler<ResourceOffersEv tasks.add(task); LOGGER.info("Launching task: {} using offer: {}", task.getTaskId().getValue(), offer.getId()); LOGGER.debug("Launching task: {} with profile: {} using offer: {}", task, profile, offer); - driver.launchTasks(offerIds, tasks); - launchedTaskId = pendingTaskId; + schedulerState.makeTaskStaging(pendingTaskId); - // TODO (sdaingade) For every NM Task that we launch, we currently + // For every NM Task that we launch, we currently // need to backup the ExecutorInfo for that NM Task in the State Store. // Without this, we will not be able to launch tasks corresponding to yarn // containers. This is specially important in case the RM restarts. - if (task.hasExecutor() && taskToLaunch.getExecutorInfo() == null) { - taskToLaunch.setExecutorInfo(task.getExecutor()); - schedulerState.updateStateStore(); - } - + taskToLaunch.setExecutorInfo(task.getExecutor()); taskToLaunch.setHostname(offer.getHostname()); taskToLaunch.setSlaveId(offer.getSlaveId()); - offerMatch = true; + schedulerState.addTask(pendingTaskId, taskToLaunch); iterator.remove(); // remove the used offer from offers list break; } } - if (null != launchedTaskId) { - schedulerState.makeTaskStaging(launchedTaskId); - launchedTaskId = null; - } - if (!offerMatch) { - LOGGER.info( - "Declining offer {}, as it didn't match any pending task.", - offer); - driver.declineOffer(offer.getId()); - } } } http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/9677ef89/myriad-scheduler/src/main/java/com/ebay/myriad/state/MyriadStateStore.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/com/ebay/myriad/state/MyriadStateStore.java b/myriad-scheduler/src/main/java/com/ebay/myriad/state/MyriadStateStore.java index 533eaed..2d28fbe 100644 --- a/myriad-scheduler/src/main/java/com/ebay/myriad/state/MyriadStateStore.java +++ b/myriad-scheduler/src/main/java/com/ebay/myriad/state/MyriadStateStore.java @@ -18,13 +18,15 @@ package com.ebay.myriad.state; +import com.ebay.myriad.state.utils.StoreContext; + /** * Interface implemented by all Myriad State Store implementations */ public interface MyriadStateStore { - byte[] loadMyriadState() throws Exception; + StoreContext loadMyriadState() throws Exception; - void storeMyriadState(byte[] myriadState) throws Exception; + void storeMyriadState(StoreContext storeContext) throws Exception; } http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/9677ef89/myriad-scheduler/src/main/java/com/ebay/myriad/state/SchedulerState.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/com/ebay/myriad/state/SchedulerState.java b/myriad-scheduler/src/main/java/com/ebay/myriad/state/SchedulerState.java index e27e976..f589056 100644 --- a/myriad-scheduler/src/main/java/com/ebay/myriad/state/SchedulerState.java +++ b/myriad-scheduler/src/main/java/com/ebay/myriad/state/SchedulerState.java @@ -17,6 +17,7 @@ package com.ebay.myriad.state; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -34,7 +35,6 @@ import org.apache.mesos.Protos; import org.apache.mesos.Protos.SlaveID; import com.ebay.myriad.state.utils.StoreContext; -import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore; /** * Represents the state of the Myriad scheduler @@ -48,11 +48,11 @@ public class SchedulerState { private Set<Protos.TaskID> activeTasks; private Set<Protos.TaskID> lostTasks; private Set<Protos.TaskID> killableTasks; - private MyriadState myriadState; + //private MyriadState myriadState; private Protos.FrameworkID frameworkId; - private RMStateStore stateStore; + private MyriadStateStore stateStore; - public SchedulerState(RMStateStore stateStore) { + public SchedulerState(MyriadStateStore stateStore) { this.tasks = new ConcurrentHashMap<>(); this.pendingTasks = new HashSet<>(); this.stagingTasks = new HashSet<>(); @@ -78,11 +78,13 @@ public class SchedulerState { } - private void addTask(Protos.TaskID taskId, NodeTask node) { + // TODO (sdaingade) Clone NodeTask + public synchronized void addTask(Protos.TaskID taskId, NodeTask node) { this.tasks.put(taskId, node); + updateStateStore(); } - public void updateTask(Protos.TaskStatus taskStatus) { + public synchronized void updateTask(Protos.TaskStatus taskStatus) { Objects.requireNonNull(taskStatus, "TaskStatus object shouldn't be null"); Protos.TaskID taskId = taskStatus.getTaskId(); if (this.tasks.containsKey(taskId)) { @@ -91,10 +93,9 @@ public class SchedulerState { updateStateStore(); } - public void makeTaskPending(Protos.TaskID taskId) { + public synchronized void makeTaskPending(Protos.TaskID taskId) { Objects.requireNonNull(taskId, "taskId cannot be empty or null"); - pendingTasks.add(taskId); stagingTasks.remove(taskId); activeTasks.remove(taskId); @@ -103,7 +104,7 @@ public class SchedulerState { updateStateStore(); } - public void makeTaskStaging(Protos.TaskID taskId) { + public synchronized void makeTaskStaging(Protos.TaskID taskId) { Objects.requireNonNull(taskId, "taskId cannot be empty or null"); @@ -115,7 +116,7 @@ public class SchedulerState { updateStateStore(); } - public void makeTaskActive(Protos.TaskID taskId) { + public synchronized void makeTaskActive(Protos.TaskID taskId) { Objects.requireNonNull(taskId, "taskId cannot be empty or null"); @@ -127,7 +128,7 @@ public class SchedulerState { updateStateStore(); } - public void makeTaskLost(Protos.TaskID taskId) { + public synchronized void makeTaskLost(Protos.TaskID taskId) { Objects.requireNonNull(taskId, "taskId cannot be empty or null"); @@ -139,7 +140,7 @@ public class SchedulerState { updateStateStore(); } - public void makeTaskKillable(Protos.TaskID taskId) { + public synchronized void makeTaskKillable(Protos.TaskID taskId) { Objects.requireNonNull(taskId, "taskId cannot be empty or null"); @@ -152,14 +153,15 @@ public class SchedulerState { } public Set<Protos.TaskID> getKillableTasks() { - return this.killableTasks; + return Collections.unmodifiableSet(this.killableTasks); } - public NodeTask getTask(Protos.TaskID taskId) { + // TODO (sdaingade) Clone NodeTask + public synchronized NodeTask getTask(Protos.TaskID taskId) { return this.tasks.get(taskId); } - public void removeTask(Protos.TaskID taskId) { + public synchronized void removeTask(Protos.TaskID taskId) { this.pendingTasks.remove(taskId); this.stagingTasks.remove(taskId); this.activeTasks.remove(taskId); @@ -169,15 +171,15 @@ public class SchedulerState { updateStateStore(); } - public Set<Protos.TaskID> getPendingTaskIds() { - return this.pendingTasks; + public synchronized Set<Protos.TaskID> getPendingTaskIds() { + return Collections.unmodifiableSet(this.pendingTasks); } public Set<Protos.TaskID> getActiveTaskIds() { - return this.activeTasks; + return Collections.unmodifiableSet(this.activeTasks); } - public Collection<NodeTask> getActiveTasks() { + public synchronized Collection<NodeTask> getActiveTasks() { List<NodeTask> activeNodeTasks = new ArrayList<>(); if (CollectionUtils.isNotEmpty(activeTasks) && CollectionUtils.isNotEmpty(tasks.values())) { @@ -187,10 +189,11 @@ public class SchedulerState { } } } - return activeNodeTasks; + return Collections.unmodifiableCollection(activeNodeTasks); } - public NodeTask getNodeTask(SlaveID slaveId) { + // TODO (sdaingade) Clone NodeTask + public synchronized NodeTask getNodeTask(SlaveID slaveId) { for (Map.Entry<Protos.TaskID, NodeTask> entry : tasks.entrySet()) { if (entry.getValue().getSlaveId() != null && entry.getValue().getSlaveId().equals(slaveId)) { @@ -200,19 +203,17 @@ public class SchedulerState { return null; } - public Set<Protos.TaskID> getStagingTaskIds() { - return this.stagingTasks; + public synchronized Set<Protos.TaskID> getStagingTaskIds() { + return Collections.unmodifiableSet(this.stagingTasks); } - public Set<Protos.TaskID> getLostTaskIds() { - return this.lostTasks; + public synchronized Set<Protos.TaskID> getLostTaskIds() { + return Collections.unmodifiableSet(this.lostTasks); } - public MyriadState getMyriadState() { - return this.myriadState; - } - - public Collection<Protos.TaskStatus> getTaskStatuses() { + // TODO (sdaingade) Currently cannot return unmodifiableCollection + // as this will break ReconcileService code + public synchronized Collection<Protos.TaskStatus> getTaskStatuses() { Collection<Protos.TaskStatus> taskStatuses = new ArrayList<>(this.tasks.size()); Collection<NodeTask> tasks = this.tasks.values(); for (NodeTask task : tasks) { @@ -225,42 +226,44 @@ public class SchedulerState { return taskStatuses; } - public boolean hasTask(Protos.TaskID taskID) { + public synchronized boolean hasTask(Protos.TaskID taskID) { return this.tasks.containsKey(taskID); } - public Protos.FrameworkID getFrameworkID() { + public synchronized Protos.FrameworkID getFrameworkID() { return this.frameworkId; } - public void setFrameworkId(Protos.FrameworkID newFrameworkId) { + public synchronized void setFrameworkId(Protos.FrameworkID newFrameworkId) { this.frameworkId = newFrameworkId; updateStateStore(); } - public void updateStateStore() { - if (!isMyriadStateStore()) { + private synchronized void updateStateStore() { + if (this.stateStore == null) { + LOGGER.debug("Could not update state to state store as HA is disabled"); return; } + try { StoreContext sc = new StoreContext(frameworkId, tasks, pendingTasks, stagingTasks, activeTasks, lostTasks, killableTasks); - ((MyriadStateStore) stateStore).storeMyriadState( - sc.toSerializedContext().toByteArray()); - LOGGER.info("Scheduler state stored to state store"); + stateStore.storeMyriadState(sc); + LOGGER.info("Scheduler state updated to state store"); } catch (Exception e) { - LOGGER.error("Failed to write scheduler state to state store", e); + LOGGER.error("Failed to update scheduler state to state store", e); } } - private void loadStateStore() { - if (!isMyriadStateStore()) { + private synchronized void loadStateStore() { + if (this.stateStore == null) { + LOGGER.debug("Could not load state from state store as HA is disabled"); return; } + try { - byte[] stateStoreBytes = ((MyriadStateStore) stateStore).loadMyriadState(); - if (stateStoreBytes != null && stateStoreBytes.length > 0) { - StoreContext sc = StoreContext.fromSerializedBytes(stateStoreBytes); + StoreContext sc = stateStore.loadMyriadState(); + if (sc != null) { this.frameworkId = sc.getFrameworkId(); this.tasks.putAll(sc.getTasks()); this.pendingTasks.addAll(sc.getPendingTasks()); @@ -273,13 +276,4 @@ public class SchedulerState { LOGGER.error("Failed to read scheduler state from state store", e); } } - - private boolean isMyriadStateStore() { - if (!(stateStore instanceof MyriadStateStore)) { - LOGGER.error("State store is not an instance of " + - MyriadStateStore.class.getName() + ". Cannot load/store Scheduler state."); - return false; - } - return true; - } } http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/9677ef89/myriad-scheduler/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MyriadFileSystemRMStateStore.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MyriadFileSystemRMStateStore.java b/myriad-scheduler/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MyriadFileSystemRMStateStore.java index 45bfcfd..426d7f2 100644 --- a/myriad-scheduler/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MyriadFileSystemRMStateStore.java +++ b/myriad-scheduler/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MyriadFileSystemRMStateStore.java @@ -28,6 +28,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.ebay.myriad.state.MyriadStateStore; +import com.ebay.myriad.state.utils.StoreContext; /** * StateStore that stores Myriad state in addition to RM state to DFS. @@ -43,7 +44,7 @@ public class MyriadFileSystemRMStateStore extends FileSystemRMStateStore private static final String MYRIAD_STATE_FILE = "MyriadState"; private Path myriadPathRoot = null; - private byte[] myriadState = null; + private byte[] myriadStateBytes = null; @Override public synchronized void initInternal(Configuration conf) throws Exception{ @@ -69,8 +70,8 @@ public class MyriadFileSystemRMStateStore extends FileSystemRMStateStore // Throws IOException if file is not present. FileStatus fileStatus = fs.listStatus(myriadStatePath)[0]; FSDataInputStream in = fs.open(myriadStatePath); - myriadState = new byte[(int) fileStatus.getLen()]; - in.readFully(myriadState); + myriadStateBytes = new byte[(int) fileStatus.getLen()]; + in.readFully(myriadStateBytes); in.close(); } catch (IOException e) { LOGGER.error("State information for Myriad could not be loaded from: " @@ -80,22 +81,22 @@ public class MyriadFileSystemRMStateStore extends FileSystemRMStateStore } @Override - public synchronized byte[] loadMyriadState() throws Exception { - byte[] ms = null; - if (myriadState != null) { - ms = myriadState.clone(); - myriadState = null; + public synchronized StoreContext loadMyriadState() throws Exception { + StoreContext sc = null; + if (myriadStateBytes != null && myriadStateBytes.length > 0) { + sc = StoreContext.fromSerializedBytes(myriadStateBytes); + myriadStateBytes = null; } - return ms; + return sc; } @Override - public synchronized void storeMyriadState(byte[] myriadState) throws Exception{ + public synchronized void storeMyriadState(StoreContext sc) throws Exception{ Path myriadStatePath = new Path(myriadPathRoot, MYRIAD_STATE_FILE); LOGGER.info("Storing state informatio for Myriad at: " + myriadStatePath); try { - updateFile(myriadStatePath, myriadState); + updateFile(myriadStatePath, sc.toSerializedContext().toByteArray()); } catch (Exception e) { LOGGER.error("State information for Myriad could not be stored at: " + myriadStatePath, e); http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/9677ef89/myriad-scheduler/src/main/resources/myriad-config-default.yml ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/resources/myriad-config-default.yml b/myriad-scheduler/src/main/resources/myriad-config-default.yml index be74043..14b82b0 100644 --- a/myriad-scheduler/src/main/resources/myriad-config-default.yml +++ b/myriad-scheduler/src/main/resources/myriad-config-default.yml @@ -27,6 +27,7 @@ profiles: nmInstances: # NMs to start with. Requires at least 1 NM with a non-zero profile. medium: 1 # <profile_name : instances> rebalancer: false +haEnabled: false nodemanager: jvmMaxMemoryMB: 1024 cpus: 0.2