Merge branch 'phase1' into issue_14 Conflicts: myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/TaskFactory.java myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/event/handlers/ResourceOffersEventHandler.java
Project: http://git-wip-us.apache.org/repos/asf/incubator-myriad/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-myriad/commit/c895fb8d Tree: http://git-wip-us.apache.org/repos/asf/incubator-myriad/tree/c895fb8d Diff: http://git-wip-us.apache.org/repos/asf/incubator-myriad/diff/c895fb8d Branch: refs/heads/master Commit: c895fb8d6dc6b0ad736d3c86b525e86170e52062 Parents: a4ceb36 7145b9b Author: Santosh Marella <smare...@maprtech.com> Authored: Mon Aug 10 19:06:16 2015 -0700 Committer: Santosh Marella <smare...@maprtech.com> Committed: Mon Aug 10 19:06:16 2015 -0700 ---------------------------------------------------------------------- docs/myriad-dev.md | 28 + .../myriad-remote-distribution-configuration.md | 2 +- .../com/ebay/myriad/executor/NMTaskConfig.java | 37 + .../ebay/myriad/executor/MyriadExecutor.java | 18 +- .../java/com/ebay/myriad/scheduler/NMPorts.java | 65 + .../com/ebay/myriad/scheduler/TaskFactory.java | 496 +- .../handlers/ResourceOffersEventHandler.java | 302 +- .../webapp/css/bootstrap-default.min.css | 5 - .../resources/webapp/css/bootstrap-myriad.css | 6492 ++++++++++++++++++ .../webapp/css/bootstrap-slate.min.css | 7 - .../main/resources/webapp/css/bootstrap.min.css | 7 - .../src/main/resources/webapp/css/myriad.css | 20 +- .../main/resources/webapp/img/favicon-16x16.png | Bin 0 -> 1362 bytes .../main/resources/webapp/img/favicon-32x32.png | Bin 0 -> 2262 bytes .../main/resources/webapp/img/favicon-96x96.png | Bin 0 -> 6872 bytes .../src/main/resources/webapp/img/favicon.ico | Bin 1150 -> 1150 bytes .../main/resources/webapp/img/navbar_logo.png | Bin 0 -> 18128 bytes .../src/main/resources/webapp/index.html | 2 +- .../webapp/js/components/NavbarComponent.js | 8 +- .../webapp/public/css/bootstrap-myriad-2.css | 6492 ++++++++++++++++++ .../webapp/public/css/bootstrap-myriad.css | 6492 ++++++++++++++++++ .../main/resources/webapp/public/css/myriad.css | 20 +- .../webapp/public/img/favicon-16x16.png | Bin 0 -> 1362 bytes .../webapp/public/img/favicon-32x32.png | Bin 0 -> 2262 bytes .../webapp/public/img/favicon-96x96.png | Bin 0 -> 6872 bytes .../resources/webapp/public/img/favicon.ico | Bin 1150 -> 1150 bytes .../resources/webapp/public/img/navbar_logo.png | Bin 0 -> 18128 bytes .../src/main/resources/webapp/public/index.html | 2 +- .../main/resources/webapp/public/js/myriad.js | 2142 +++--- shutdown.sh | 6 +- 30 files changed, 21335 insertions(+), 1308 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/c895fb8d/myriad-executor/src/main/java/com/ebay/myriad/executor/MyriadExecutor.java ---------------------------------------------------------------------- diff --cc myriad-executor/src/main/java/com/ebay/myriad/executor/MyriadExecutor.java index 2d5c62f,42535c5..dc8652c --- a/myriad-executor/src/main/java/com/ebay/myriad/executor/MyriadExecutor.java +++ b/myriad-executor/src/main/java/com/ebay/myriad/executor/MyriadExecutor.java @@@ -114,18 -122,9 +123,18 @@@ public class MyriadExecutor implements @Override public void launchTask(final ExecutorDriver driver, final TaskInfo task) { + // "task id beginning with "yarn" is a mesos task for yarn container + if (task.getTaskId().getValue().startsWith("yarn")) { + TaskStatus status = TaskStatus.newBuilder().setTaskId(task.getTaskId()) + .setState(TaskState.TASK_RUNNING).build(); + driver.sendStatusUpdate(status); + return; + } + + // Launch NM as a task and return status to framework new Thread(new Runnable() { public void run() { - Builder statusBuilder = TaskStatus.newBuilder() + TaskStatus.Builder statusBuilder = TaskStatus.newBuilder() .setTaskId(task.getTaskId()); try { NMTaskConfig taskConfig = GSON.fromJson(task.getData().toStringUtf8(), NMTaskConfig.class); http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/c895fb8d/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/TaskFactory.java ---------------------------------------------------------------------- diff --cc myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/TaskFactory.java index a5aeae3,0e2ce16..723d4cf --- a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/TaskFactory.java +++ b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/TaskFactory.java @@@ -30,75 -30,97 +31,104 @@@ import java.util.Iterator import java.util.Objects; /** - * Factory to create TaskInfo objects. + * Creates Tasks based on mesos offers */ public interface TaskFactory { -- TaskInfo createTask(Offer offer, TaskID taskId, NodeTask nodeTask); - - // TODO(Santosh): This is needed because the ExecutorInfo constructed - // to launch NM needs to be specified to launch placeholder tasks for - // yarn containers (for fine grained scaling). - // If mesos supports just specifying the 'ExecutorId' without the full - // ExecutorInfo, we wouldn't need this interface method. - ExecutorInfo getExecutorInfoForSlave(SlaveID slave); -- -- /** - * Creates TaskInfo objects to launch NMs as mesos tasks. - * The Node Manager Task factory implementation -- */ -- class NMTaskFactoryImpl implements TaskFactory { -- public static final String EXECUTOR_NAME = "myriad_task"; -- public static final String EXECUTOR_PREFIX = "myriad_executor"; -- public static final String YARN_NODEMANAGER_OPTS_KEY = "YARN_NODEMANAGER_OPTS"; -- private static final String YARN_RESOURCEMANAGER_HOSTNAME = "yarn.resourcemanager.hostname"; -- private static final String YARN_RESOURCEMANAGER_WEBAPP_ADDRESS = "yarn.resourcemanager.webapp.address"; -- private static final String YARN_RESOURCEMANAGER_WEBAPP_HTTPS_ADDRESS = "yarn.resourcemanager.webapp.https.address"; -- private static final String YARN_HTTP_POLICY = "yarn.http.policy"; -- private static final String YARN_HTTP_POLICY_HTTPS_ONLY = "HTTPS_ONLY"; -- -- private static final Logger LOGGER = LoggerFactory.getLogger(NMTaskFactoryImpl.class); -- private MyriadConfiguration cfg; -- private TaskUtils taskUtils; -- -- @Inject -- public NMTaskFactoryImpl(MyriadConfiguration cfg, TaskUtils taskUtils) { -- this.cfg = cfg; -- this.taskUtils = taskUtils; -- } ++ TaskInfo createTask(Offer offer, TaskID taskId, NodeTask nodeTask); ++ ++ // TODO(Santosh): This is needed because the ExecutorInfo constructed ++ // to launch NM needs to be specified to launch placeholder tasks for ++ // yarn containers (for fine grained scaling). ++ // If mesos supports just specifying the 'ExecutorId' without the full ++ // ExecutorInfo, we wouldn't need this interface method. ++ ExecutorInfo getExecutorInfoForSlave(SlaveID slave); ++ ++ /** ++ * Creates TaskInfo objects to launch NMs as mesos tasks. ++ */ ++ class NMTaskFactoryImpl implements TaskFactory { ++ public static final String EXECUTOR_NAME = "myriad_task"; ++ public static final String EXECUTOR_PREFIX = "myriad_executor"; ++ public static final String YARN_NODEMANAGER_OPTS_KEY = "YARN_NODEMANAGER_OPTS"; ++ private static final String YARN_RESOURCEMANAGER_HOSTNAME = "yarn.resourcemanager.hostname"; ++ private static final String YARN_RESOURCEMANAGER_WEBAPP_ADDRESS = "yarn.resourcemanager.webapp.address"; ++ private static final String YARN_RESOURCEMANAGER_WEBAPP_HTTPS_ADDRESS = "yarn.resourcemanager.webapp.https.address"; ++ private static final String YARN_HTTP_POLICY = "yarn.http.policy"; ++ private static final String YARN_HTTP_POLICY_HTTPS_ONLY = "HTTPS_ONLY"; ++ ++ private static final Logger LOGGER = LoggerFactory.getLogger(NMTaskFactoryImpl.class); ++ private MyriadConfiguration cfg; ++ private TaskUtils taskUtils; ++ ++ @Inject ++ public NMTaskFactoryImpl(MyriadConfiguration cfg, TaskUtils taskUtils) { ++ this.cfg = cfg; ++ this.taskUtils = taskUtils; ++ } - private static String getFileName(String uri) { - int lastSlash = uri.lastIndexOf('/'); - if (lastSlash == -1) { - return uri; - } else { - String fileName = uri.substring(lastSlash + 1); - Preconditions.checkArgument(!Strings.isNullOrEmpty(fileName), - "URI should not have a slash at the end"); - return fileName; - //Utility function to get the first NMPorts.expectedNumPorts number of ports of an offer - private static NMPorts getPorts(Offer offer) { - HashSet<Long> ports = new HashSet<>(); - for (Resource resource : offer.getResourcesList()){ - if (resource.getName().equals("ports")){ ++ //Utility function to get the first NMPorts.expectedNumPorts number of ports of an offer ++ private static NMPorts getPorts(Offer offer) { ++ HashSet<Long> ports = new HashSet<>(); ++ for (Resource resource : offer.getResourcesList()){ ++ if (resource.getName().equals("ports")){ + /* + ranges.getRangeList() returns a list of ranges, each range specifies a begin and end only. + so must loop though each range until we get all ports needed. We exit each loop as soon as all + ports are found so bounded by NMPorts.expectedNumPorts. + */ - Iterator<Value.Range> itr = resource.getRanges().getRangeList().iterator(); - while (itr.hasNext() && ports.size() < NMPorts.expectedNumPorts()) { - Value.Range range = itr.next(); - if (range.getBegin() <= range.getEnd()) { - long i = range.getBegin(); - while (i <= range.getEnd() && ports.size() < NMPorts.expectedNumPorts()) { - ports.add(i); - i++; - } - } - } - } ++ Iterator<Value.Range> itr = resource.getRanges().getRangeList().iterator(); ++ while (itr.hasNext() && ports.size() < NMPorts.expectedNumPorts()) { ++ Value.Range range = itr.next(); ++ if (range.getBegin() <= range.getEnd()) { ++ long i = range.getBegin(); ++ while (i <= range.getEnd() && ports.size() < NMPorts.expectedNumPorts()) { ++ ports.add(i); ++ i++; ++ } } - - Preconditions.checkState(ports.size() == NMPorts.expectedNumPorts(), "Not enough ports in offer"); - Long [] portArray = ports.toArray(new Long [ports.size()]); - return new NMPorts(portArray); ++ } } ++ } - private String getConfigurationUrl() { - YarnConfiguration conf = new YarnConfiguration(); - String httpPolicy = conf.get(YARN_HTTP_POLICY); - if (httpPolicy != null && httpPolicy.equals(YARN_HTTP_POLICY_HTTPS_ONLY)) { - String address = conf.get(YARN_RESOURCEMANAGER_WEBAPP_HTTPS_ADDRESS); - if (address == null || address.isEmpty()) { - address = conf.get(YARN_RESOURCEMANAGER_HOSTNAME) + ":8090"; - } - return "https://" + address + "/conf"; - private static String getFileName(String uri) { - int lastSlash = uri.lastIndexOf('/'); - if (lastSlash == -1) { - return uri; -- } else { - String address = conf.get(YARN_RESOURCEMANAGER_WEBAPP_ADDRESS); - if (address == null || address.isEmpty()) { - address = conf.get(YARN_RESOURCEMANAGER_HOSTNAME) + ":8088"; - } - return "http://" + address + "/conf"; - String fileName = uri.substring(lastSlash + 1); - Preconditions.checkArgument(!Strings.isNullOrEmpty(fileName), - "URI should not have a slash at the end"); - return fileName; -- } - } ++ Preconditions.checkState(ports.size() == NMPorts.expectedNumPorts(), "Not enough ports in offer"); ++ Long [] portArray = ports.toArray(new Long [ports.size()]); ++ return new NMPorts(portArray); ++ } + - private String getConfigurationUrl() { - YarnConfiguration conf = new YarnConfiguration(); - String httpPolicy = conf.get(YARN_HTTP_POLICY); - if (httpPolicy != null && httpPolicy.equals(YARN_HTTP_POLICY_HTTPS_ONLY)) { - String address = conf.get(YARN_RESOURCEMANAGER_WEBAPP_HTTPS_ADDRESS); - if (address == null || address.isEmpty()) { - address = conf.get(YARN_RESOURCEMANAGER_HOSTNAME) + ":8090"; - } - return "https://" + address + "/conf"; - } else { - String address = conf.get(YARN_RESOURCEMANAGER_WEBAPP_ADDRESS); - if (address == null || address.isEmpty()) { - address = conf.get(YARN_RESOURCEMANAGER_HOSTNAME) + ":8088"; - } - return "http://" + address + "/conf"; - } ++ private static String getFileName(String uri) { ++ int lastSlash = uri.lastIndexOf('/'); ++ if (lastSlash == -1) { ++ return uri; ++ } else { ++ String fileName = uri.substring(lastSlash + 1); ++ Preconditions.checkArgument(!Strings.isNullOrEmpty(fileName), ++ "URI should not have a slash at the end"); ++ return fileName; ++ } ++ } ++ ++ private String getConfigurationUrl() { ++ YarnConfiguration conf = new YarnConfiguration(); ++ String httpPolicy = conf.get(YARN_HTTP_POLICY); ++ if (httpPolicy != null && httpPolicy.equals(YARN_HTTP_POLICY_HTTPS_ONLY)) { ++ String address = conf.get(YARN_RESOURCEMANAGER_WEBAPP_HTTPS_ADDRESS); ++ if (address == null || address.isEmpty()) { ++ address = conf.get(YARN_RESOURCEMANAGER_HOSTNAME) + ":8090"; ++ } ++ return "https://" + address + "/conf"; ++ } else { ++ String address = conf.get(YARN_RESOURCEMANAGER_WEBAPP_ADDRESS); ++ if (address == null || address.isEmpty()) { ++ address = conf.get(YARN_RESOURCEMANAGER_HOSTNAME) + ":8088"; } ++ return "http://" + address + "/conf"; ++ } ++ } - private Protos.CommandInfo getCommandInfo() { - private CommandInfo getCommandInfo() { -- MyriadExecutorConfiguration myriadExecutorConfiguration = cfg.getMyriadExecutorConfiguration(); -- CommandInfo.Builder commandInfo = CommandInfo.newBuilder(); -- if (myriadExecutorConfiguration.getNodeManagerUri().isPresent()) { ++ private CommandInfo getCommandInfo() { ++ MyriadExecutorConfiguration myriadExecutorConfiguration = cfg.getMyriadExecutorConfiguration(); ++ CommandInfo.Builder commandInfo = CommandInfo.newBuilder(); ++ if (myriadExecutorConfiguration.getNodeManagerUri().isPresent()) { /* TODO(darinj): Overall this is messier than I'd like. We can't let mesos untar the distribution, since it will change the permissions. Instead we simply download the tarball and execute tar -xvpf. We also @@@ -106,168 -128,193 +136,194 @@@ frameworkSuperUser. This will be refactored after Mesos-1790 is resolved. */ -- //Both FrameworkUser and FrameworkSuperuser to get all of the directory permissions correct. -- if (!(cfg.getFrameworkUser().isPresent() && cfg.getFrameworkSuperUser().isPresent())) { -- throw new RuntimeException("Trying to use remote distribution, but frameworkUser" + -- "and/or frameworkSuperUser not set!"); -- } -- -- LOGGER.info("Using remote distribution"); -- -- String nmURIString = myriadExecutorConfiguration.getNodeManagerUri().get(); -- -- //TODO(DarinJ) support other compression, as this is a temp fix for Mesos 1760 may not get to it. -- //Extract tarball keeping permissions, necessary to keep HADOOP_HOME/bin/container-executor suidbit set. -- String tarCmd = "sudo tar -zxpf " + getFileName(nmURIString); -- -- //We need the current directory to be writable by frameworkUser for capsuleExecutor to create directories. -- //Best to simply give owenership to the user running the executor but we don't want to use -R as this -- //will silently remove the suid bit on container executor. -- String chownCmd = "sudo chown " + cfg.getFrameworkUser().get() + " ."; -- -- //Place the hadoop config where in the HADOOP_CONF_DIR where it will be read by the NodeManager -- //The url for the resource manager config is: http(s)://hostname:port/conf so fetcher.cpp downloads the -- //config file to conf, It's an xml file with the parameters of yarn-site.xml, core-site.xml and hdfs.xml. -- String configCopyCmd = "cp conf " + cfg.getYarnEnvironment().get("YARN_HOME") + -- "/etc/hadoop/yarn-site.xml"; -- -- //Command to run the executor -- String executorPathString = myriadExecutorConfiguration.getPath(); -- String executorCmd = "export CAPSULE_CACHE_DIR=`pwd`;echo $CAPSULE_CACHE_DIR; " + -- "sudo -E -u " + cfg.getFrameworkUser().get() + " -H " + -- "java -Dcapsule.log=verbose -jar " + getFileName(executorPathString); -- -- //Concatenate all the subcommands -- String cmd = tarCmd + "&&" + chownCmd + "&&" + configCopyCmd + "&&" + executorCmd; -- -- //get the nodemanagerURI -- //We're going to extract ourselves, so setExtract is false -- LOGGER.info("Getting Hadoop distribution from:" + nmURIString); -- URI nmUri = URI.newBuilder().setValue(nmURIString).setExtract(false) -- .build(); -- -- //get configs directly from resource manager -- String configUrlString = getConfigurationUrl(); -- LOGGER.info("Getting config from:" + configUrlString); -- URI configUri = URI.newBuilder().setValue(configUrlString) -- .build(); -- -- //get the executor URI -- LOGGER.info("Getting executor from:" + executorPathString); -- URI executorUri = URI.newBuilder().setValue(executorPathString).setExecutable(true) -- .build(); -- -- LOGGER.info("Slave will execute command:" + cmd); -- commandInfo.addUris(nmUri).addUris(configUri).addUris(executorUri).setValue("echo \"" + cmd + "\";" + cmd); -- -- commandInfo.setUser(cfg.getFrameworkSuperUser().get()); -- -- } else { -- String cmdPrefix = "export CAPSULE_CACHE_DIR=`pwd` ;" + -- "echo $CAPSULE_CACHE_DIR; java -Dcapsule.log=verbose -jar "; -- String executorPath = myriadExecutorConfiguration.getPath(); -- String cmd = cmdPrefix + getFileName(executorPath); -- URI executorURI = URI.newBuilder().setValue(executorPath) -- .setExecutable(true).build(); -- commandInfo.addUris(executorURI) -- .setValue("echo \"" + cmd + "\";" + cmd); -- -- if (cfg.getFrameworkUser().isPresent()) { -- commandInfo.setUser(cfg.getFrameworkUser().get()); -- } -- } -- return commandInfo.build(); ++ //Both FrameworkUser and FrameworkSuperuser to get all of the directory permissions correct. ++ if (!(cfg.getFrameworkUser().isPresent() && cfg.getFrameworkSuperUser().isPresent())) { ++ throw new RuntimeException("Trying to use remote distribution, but frameworkUser" + ++ "and/or frameworkSuperUser not set!"); } -- @Override -- public TaskInfo createTask(Offer offer, TaskID taskId, NodeTask nodeTask) { -- Objects.requireNonNull(offer, "Offer should be non-null"); -- Objects.requireNonNull(nodeTask, "NodeTask should be non-null"); - - NMPorts ports = getPorts(offer); - LOGGER.debug(ports.toString()); -- -- NMProfile profile = nodeTask.getProfile(); -- NMTaskConfig nmTaskConfig = new NMTaskConfig(); -- nmTaskConfig.setAdvertisableCpus(profile.getCpus()); -- nmTaskConfig.setAdvertisableMem(profile.getMemory()); -- NodeManagerConfiguration nodeManagerConfiguration = this.cfg.getNodeManagerConfiguration(); -- nmTaskConfig.setJvmOpts(nodeManagerConfiguration.getJvmOpts().orNull()); -- nmTaskConfig.setCgroups(nodeManagerConfiguration.getCgroups().or(Boolean.FALSE)); - nmTaskConfig.setRpcPort(ports.getRpcPort()); - nmTaskConfig.setLocalizerPort(ports.getLocalizerPort()); - nmTaskConfig.setWebAppHttpPort(ports.getWebAppHttpPort()); - nmTaskConfig.setShufflePort(ports.getShufflePort()); -- nmTaskConfig.setYarnEnvironment(cfg.getYarnEnvironment()); -- -- // if RM's hostname is passed in as a system property, pass it along -- // to Node Managers launched via Myriad -- String rmHostName = System.getProperty(YARN_RESOURCEMANAGER_HOSTNAME); -- if (rmHostName != null && !rmHostName.isEmpty()) { -- -- String nmOpts = nmTaskConfig.getYarnEnvironment().get(YARN_NODEMANAGER_OPTS_KEY); -- if (nmOpts == null) { -- nmOpts = ""; -- } -- nmOpts += " " + "-D" + YARN_RESOURCEMANAGER_HOSTNAME + "=" + rmHostName; -- nmTaskConfig.getYarnEnvironment().put(YARN_NODEMANAGER_OPTS_KEY, nmOpts); -- LOGGER.info(YARN_RESOURCEMANAGER_HOSTNAME + " is set to " + rmHostName + -- " via YARN_RESOURCEMANAGER_OPTS. Passing it into YARN_NODEMANAGER_OPTS."); -- } --// else { -- // TODO(Santosh): Handle this case. Couple of options: -- // 1. Lookup a hostname here and use it as "RM's hostname" -- // 2. Abort here.. RM cannot start unless a hostname is passed in as it requires it to pass to NMs. -- -- String taskConfigJSON = new Gson().toJson(nmTaskConfig); -- - Scalar taskMemory = Value.Scalar.newBuilder() - Scalar taskMemory = Scalar.newBuilder() -- .setValue(taskUtils.getTaskMemory(profile)) -- .build(); - Scalar taskCpus = Value.Scalar.newBuilder() - Scalar taskCpus = Scalar.newBuilder() -- .setValue(taskUtils.getTaskCpus(profile)) -- .build(); - ExecutorInfo executorInfo = getExecutorInfoForSlave(offer.getSlaveId()); - Scalar executorMemory = Scalar.newBuilder() - .setValue(taskUtils.getExecutorMemory()) - .build(); - Scalar executorCpus = Scalar.newBuilder() - .setValue(taskUtils.getExecutorCpus()) - .build(); - - CommandInfo commandInfo = getCommandInfo(); - - ExecutorID executorId = ExecutorID.newBuilder() - .setValue(EXECUTOR_PREFIX + offer.getSlaveId().getValue()) - .build(); - ExecutorInfo executorInfo = ExecutorInfo - .newBuilder() - .setCommand(commandInfo) - .setName(EXECUTOR_NAME) - .addResources( - Resource.newBuilder().setName("cpus") - .setType(Value.Type.SCALAR) - .setScalar(executorCpus) - .build()) - .addResources( - Resource.newBuilder().setName("mem") - .setType(Value.Type.SCALAR) - .setScalar(executorMemory) - .build()) - .setExecutorId(executorId).setCommand(commandInfo).build(); -- -- TaskInfo.Builder taskBuilder = TaskInfo.newBuilder() -- .setName("task-" + taskId.getValue()) -- .setTaskId(taskId) -- .setSlaveId(offer.getSlaveId()); -- - // TODO (mohit): Configure ports for multi-tenancy -- ByteString data = ByteString.copyFrom(taskConfigJSON.getBytes(Charset.defaultCharset())); -- return taskBuilder -- .addResources( -- Resource.newBuilder().setName("cpus") -- .setType(Value.Type.SCALAR) -- .setScalar(taskCpus) -- .build()) -- .addResources( -- Resource.newBuilder().setName("mem") -- .setType(Value.Type.SCALAR) -- .setScalar(taskMemory) -- .build()) - .addResources( - Resource.newBuilder().setName("ports") - .setType(Value.Type.RANGES) - .setRanges(Value.Ranges.newBuilder() - .addRange(Value.Range.newBuilder() - .setBegin(ports.getRpcPort()) - .setEnd(ports.getRpcPort()) - .build()) - .addRange(Value.Range.newBuilder() - .setBegin(ports.getLocalizerPort()) - .setEnd(ports.getLocalizerPort()) - .build()) - .addRange(Value.Range.newBuilder() - .setBegin(ports.getWebAppHttpPort()) - .setEnd(ports.getWebAppHttpPort()) - .build()) - .addRange(Value.Range.newBuilder() - .setBegin(ports.getShufflePort()) - .setEnd(ports.getShufflePort()) - .build()))) -- .setExecutor(executorInfo).setData(data).build(); ++ LOGGER.info("Using remote distribution"); ++ ++ String nmURIString = myriadExecutorConfiguration.getNodeManagerUri().get(); ++ ++ //TODO(DarinJ) support other compression, as this is a temp fix for Mesos 1760 may not get to it. ++ //Extract tarball keeping permissions, necessary to keep HADOOP_HOME/bin/container-executor suidbit set. ++ String tarCmd = "sudo tar -zxpf " + getFileName(nmURIString); ++ ++ //We need the current directory to be writable by frameworkUser for capsuleExecutor to create directories. ++ //Best to simply give owenership to the user running the executor but we don't want to use -R as this ++ //will silently remove the suid bit on container executor. ++ String chownCmd = "sudo chown " + cfg.getFrameworkUser().get() + " ."; ++ ++ //Place the hadoop config where in the HADOOP_CONF_DIR where it will be read by the NodeManager ++ //The url for the resource manager config is: http(s)://hostname:port/conf so fetcher.cpp downloads the ++ //config file to conf, It's an xml file with the parameters of yarn-site.xml, core-site.xml and hdfs.xml. ++ String configCopyCmd = "cp conf " + cfg.getYarnEnvironment().get("YARN_HOME") + ++ "/etc/hadoop/yarn-site.xml"; ++ ++ //Command to run the executor ++ String executorPathString = myriadExecutorConfiguration.getPath(); ++ String executorCmd = "export CAPSULE_CACHE_DIR=`pwd`;echo $CAPSULE_CACHE_DIR; " + ++ "sudo -E -u " + cfg.getFrameworkUser().get() + " -H " + ++ "java -Dcapsule.log=verbose -jar " + getFileName(executorPathString); ++ ++ //Concatenate all the subcommands ++ String cmd = tarCmd + "&&" + chownCmd + "&&" + configCopyCmd + "&&" + executorCmd; ++ ++ //get the nodemanagerURI ++ //We're going to extract ourselves, so setExtract is false ++ LOGGER.info("Getting Hadoop distribution from:" + nmURIString); ++ URI nmUri = URI.newBuilder().setValue(nmURIString).setExtract(false) ++ .build(); ++ ++ //get configs directly from resource manager ++ String configUrlString = getConfigurationUrl(); ++ LOGGER.info("Getting config from:" + configUrlString); ++ URI configUri = URI.newBuilder().setValue(configUrlString) ++ .build(); ++ ++ //get the executor URI ++ LOGGER.info("Getting executor from:" + executorPathString); ++ URI executorUri = URI.newBuilder().setValue(executorPathString).setExecutable(true) ++ .build(); ++ ++ LOGGER.info("Slave will execute command:" + cmd); ++ commandInfo.addUris(nmUri).addUris(configUri).addUris(executorUri).setValue("echo \"" + cmd + "\";" + cmd); ++ ++ commandInfo.setUser(cfg.getFrameworkSuperUser().get()); ++ ++ } else { ++ String cmdPrefix = "export CAPSULE_CACHE_DIR=`pwd` ;" + ++ "echo $CAPSULE_CACHE_DIR; java -Dcapsule.log=verbose -jar "; ++ String executorPath = myriadExecutorConfiguration.getPath(); ++ String cmd = cmdPrefix + getFileName(executorPath); ++ URI executorURI = URI.newBuilder().setValue(executorPath) ++ .setExecutable(true).build(); ++ commandInfo.addUris(executorURI) ++ .setValue("echo \"" + cmd + "\";" + cmd); ++ ++ if (cfg.getFrameworkUser().isPresent()) { ++ commandInfo.setUser(cfg.getFrameworkUser().get()); + } ++ } ++ return commandInfo.build(); ++ } + - @Override - public ExecutorInfo getExecutorInfoForSlave(SlaveID slave) { - Scalar executorMemory = Scalar.newBuilder() - .setValue(taskUtils.getExecutorMemory()).build(); - Scalar executorCpus = Scalar.newBuilder() - .setValue(taskUtils.getExecutorCpus()).build(); - - CommandInfo commandInfo = getCommandInfo(); - - ExecutorID executorId = ExecutorID.newBuilder() - .setValue(EXECUTOR_PREFIX + slave.getValue()) - .build(); - return ExecutorInfo - .newBuilder() - .setCommand(commandInfo) - .setName(EXECUTOR_NAME) - .addResources( - Resource.newBuilder().setName("cpus") - .setType(Value.Type.SCALAR) - .setScalar(executorCpus).build()) - .addResources( - Resource.newBuilder().setName("mem") - .setType(Value.Type.SCALAR) - .setScalar(executorMemory).build()) - .setExecutorId(executorId).build(); ++ @Override ++ public TaskInfo createTask(Offer offer, TaskID taskId, NodeTask nodeTask) { ++ Objects.requireNonNull(offer, "Offer should be non-null"); ++ Objects.requireNonNull(nodeTask, "NodeTask should be non-null"); ++ ++ NMPorts ports = getPorts(offer); ++ LOGGER.debug(ports.toString()); ++ ++ NMProfile profile = nodeTask.getProfile(); ++ NMTaskConfig nmTaskConfig = new NMTaskConfig(); ++ nmTaskConfig.setAdvertisableCpus(profile.getCpus()); ++ nmTaskConfig.setAdvertisableMem(profile.getMemory()); ++ NodeManagerConfiguration nodeManagerConfiguration = this.cfg.getNodeManagerConfiguration(); ++ nmTaskConfig.setJvmOpts(nodeManagerConfiguration.getJvmOpts().orNull()); ++ nmTaskConfig.setCgroups(nodeManagerConfiguration.getCgroups().or(Boolean.FALSE)); ++ nmTaskConfig.setRpcPort(ports.getRpcPort()); ++ nmTaskConfig.setLocalizerPort(ports.getLocalizerPort()); ++ nmTaskConfig.setWebAppHttpPort(ports.getWebAppHttpPort()); ++ nmTaskConfig.setShufflePort(ports.getShufflePort()); ++ nmTaskConfig.setYarnEnvironment(cfg.getYarnEnvironment()); ++ ++ // if RM's hostname is passed in as a system property, pass it along ++ // to Node Managers launched via Myriad ++ String rmHostName = System.getProperty(YARN_RESOURCEMANAGER_HOSTNAME); ++ if (rmHostName != null && !rmHostName.isEmpty()) { ++ ++ String nmOpts = nmTaskConfig.getYarnEnvironment().get(YARN_NODEMANAGER_OPTS_KEY); ++ if (nmOpts == null) { ++ nmOpts = ""; } ++ nmOpts += " " + "-D" + YARN_RESOURCEMANAGER_HOSTNAME + "=" + rmHostName; ++ nmTaskConfig.getYarnEnvironment().put(YARN_NODEMANAGER_OPTS_KEY, nmOpts); ++ LOGGER.info(YARN_RESOURCEMANAGER_HOSTNAME + " is set to " + rmHostName + ++ " via YARN_RESOURCEMANAGER_OPTS. Passing it into YARN_NODEMANAGER_OPTS."); ++ } ++// else { ++ // TODO(Santosh): Handle this case. Couple of options: ++ // 1. Lookup a hostname here and use it as "RM's hostname" ++ // 2. Abort here.. RM cannot start unless a hostname is passed in as it requires it to pass to NMs. ++ ++ String taskConfigJSON = new Gson().toJson(nmTaskConfig); ++ ++ Scalar taskMemory = Scalar.newBuilder() ++ .setValue(taskUtils.getTaskMemory(profile)) ++ .build(); ++ Scalar taskCpus = Scalar.newBuilder() ++ .setValue(taskUtils.getTaskCpus(profile)) ++ .build(); ++ ExecutorInfo executorInfo = getExecutorInfoForSlave(offer.getSlaveId()); ++ ++ TaskInfo.Builder taskBuilder = TaskInfo.newBuilder() ++ .setName("task-" + taskId.getValue()) ++ .setTaskId(taskId) ++ .setSlaveId(offer.getSlaveId()); ++ ++ ByteString data = ByteString.copyFrom(taskConfigJSON.getBytes(Charset.defaultCharset())); ++ return taskBuilder ++ .addResources( ++ Resource.newBuilder().setName("cpus") ++ .setType(Value.Type.SCALAR) ++ .setScalar(taskCpus) ++ .build()) ++ .addResources( ++ Resource.newBuilder().setName("mem") ++ .setType(Value.Type.SCALAR) ++ .setScalar(taskMemory) ++ .build()) ++ .addResources( ++ Resource.newBuilder().setName("ports") ++ .setType(Value.Type.RANGES) ++ .setRanges(Value.Ranges.newBuilder() ++ .addRange(Value.Range.newBuilder() ++ .setBegin(ports.getRpcPort()) ++ .setEnd(ports.getRpcPort()) ++ .build()) ++ .addRange(Value.Range.newBuilder() ++ .setBegin(ports.getLocalizerPort()) ++ .setEnd(ports.getLocalizerPort()) ++ .build()) ++ .addRange(Value.Range.newBuilder() ++ .setBegin(ports.getWebAppHttpPort()) ++ .setEnd(ports.getWebAppHttpPort()) ++ .build()) ++ .addRange(Value.Range.newBuilder() ++ .setBegin(ports.getShufflePort()) ++ .setEnd(ports.getShufflePort()) ++ .build()))) ++ .setExecutor(executorInfo).setData(data).build(); ++ } ++ ++ @Override ++ public ExecutorInfo getExecutorInfoForSlave(SlaveID slave) { ++ Scalar executorMemory = Scalar.newBuilder() ++ .setValue(taskUtils.getExecutorMemory()).build(); ++ Scalar executorCpus = Scalar.newBuilder() ++ .setValue(taskUtils.getExecutorCpus()).build(); ++ ++ CommandInfo commandInfo = getCommandInfo(); ++ ++ ExecutorID executorId = ExecutorID.newBuilder() ++ .setValue(EXECUTOR_PREFIX + slave.getValue()) ++ .build(); ++ return ExecutorInfo ++ .newBuilder() ++ .setCommand(commandInfo) ++ .setName(EXECUTOR_NAME) ++ .addResources( ++ Resource.newBuilder().setName("cpus") ++ .setType(Value.Type.SCALAR) ++ .setScalar(executorCpus).build()) ++ .addResources( ++ Resource.newBuilder().setName("mem") ++ .setType(Value.Type.SCALAR) ++ .setScalar(executorMemory).build()) ++ .setExecutorId(executorId).build(); } ++ } } http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/c895fb8d/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/event/handlers/ResourceOffersEventHandler.java ---------------------------------------------------------------------- diff --cc myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/event/handlers/ResourceOffersEventHandler.java index 554f966,c009ce8..fbe93d6 --- a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/event/handlers/ResourceOffersEventHandler.java +++ b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/event/handlers/ResourceOffersEventHandler.java @@@ -15,9 -15,12 +15,13 @@@ */ package com.ebay.myriad.scheduler.event.handlers; - import com.ebay.myriad.scheduler.*; + import com.ebay.myriad.scheduler.NMPorts; + import com.ebay.myriad.scheduler.NMProfile; + import com.ebay.myriad.scheduler.SchedulerUtils; + import com.ebay.myriad.scheduler.TaskFactory; + import com.ebay.myriad.scheduler.TaskUtils; import com.ebay.myriad.scheduler.event.ResourceOffersEvent; +import com.ebay.myriad.scheduler.fgs.OfferLifecycleManager; import com.ebay.myriad.state.NodeTask; import com.ebay.myriad.state.SchedulerState; import com.lmax.disruptor.EventHandler; @@@ -48,143 -48,176 +52,191 @@@ import java.util.concurrent.locks.Reent * handles and logs resource offers events */ public class ResourceOffersEventHandler implements EventHandler<ResourceOffersEvent> { -- private static final Logger LOGGER = LoggerFactory.getLogger(ResourceOffersEventHandler.class); - - private static final Lock driverOperationLock = new ReentrantLock(); - - @Inject - private SchedulerState schedulerState; - - @Inject - private TaskFactory taskFactory; - - @Inject - private TaskUtils taskUtils; - - @Override - public void onEvent(ResourceOffersEvent event, long sequence, - boolean endOfBatch) throws Exception { - SchedulerDriver driver = event.getDriver(); - List<Offer> offers = event.getOffers(); - - LOGGER.info("Received offers {}", offers.size()); - LOGGER.debug("Pending tasks: {}", this.schedulerState.getPendingTaskIds()); - driverOperationLock.lock(); - try { - Set<Protos.TaskID> pendingTasks = schedulerState.getPendingTaskIds(); - if (CollectionUtils.isNotEmpty(pendingTasks)) { - for (Offer offer : offers) { - boolean offerMatch = false; - Protos.TaskID launchedTaskId = null; - for (Protos.TaskID pendingTaskId : pendingTasks) { - NodeTask taskToLaunch = schedulerState - .getTask(pendingTaskId); - NMProfile profile = taskToLaunch.getProfile(); - - if (matches(offer, profile) - && SchedulerUtils.isUniqueHostname(offer, - schedulerState.getActiveTasks())) { - TaskInfo task = taskFactory.createTask(offer, pendingTaskId, - taskToLaunch); - List<OfferID> offerIds = new ArrayList<>(); - offerIds.add(offer.getId()); - List<TaskInfo> tasks = new ArrayList<>(); - tasks.add(task); - LOGGER.info("Launching task: {} using offer: {}", task.getTaskId().getValue(), offer.getId()); - LOGGER.debug("Launching task: {} with profile: {} using offer: {}", task, profile, offer); - - driver.launchTasks(offerIds, tasks); - launchedTaskId = pendingTaskId; - - taskToLaunch.setHostname(offer.getHostname()); - taskToLaunch.setSlaveId(offer.getSlaveId()); - offerMatch = true; - break; - } - } - if (null != launchedTaskId) { - schedulerState.makeTaskStaging(launchedTaskId); - launchedTaskId = null; - } - if (!offerMatch) { - LOGGER.info( - "Declining offer {}, as it didn't match any pending task.", - offer); - driver.declineOffer(offer.getId()); - } - } - } else { - LOGGER.info("No pending tasks, declining all offers"); - for (Offer offer : offers) { - driver.declineOffer(offer.getId()); - } ++ private static final Logger LOGGER = LoggerFactory.getLogger(ResourceOffersEventHandler.class); + - private static final Lock driverOperationLock = new ReentrantLock(); ++ private static final Lock driverOperationLock = new ReentrantLock(); + - @Inject - private SchedulerState schedulerState; - - @Inject - private TaskFactory taskFactory; ++ @Inject ++ private SchedulerState schedulerState; + - @Inject - private TaskUtils taskUtils; ++ @Inject ++ private TaskFactory taskFactory; + + @Inject - private OfferLifecycleManager offerLifecycleMgr; - - @Override - public void onEvent(ResourceOffersEvent event, long sequence, - boolean endOfBatch) throws Exception { - SchedulerDriver driver = event.getDriver(); - List<Offer> offers = event.getOffers(); - - LOGGER.info("Received offers {}", offers.size()); - LOGGER.debug("Pending tasks: {}", this.schedulerState.getPendingTaskIds()); - driverOperationLock.lock(); - try { - Set<Protos.TaskID> pendingTasks = schedulerState.getPendingTaskIds(); - if (CollectionUtils.isNotEmpty(pendingTasks)) { - for (Iterator<Offer> iterator = offers.iterator(); iterator.hasNext();) { - Offer offer = iterator.next(); - boolean offerMatch = false; - Protos.TaskID launchedTaskId = null; - for (Protos.TaskID pendingTaskId : pendingTasks) { - NodeTask taskToLaunch = schedulerState - .getTask(pendingTaskId); - NMProfile profile = taskToLaunch.getProfile(); - if (matches(offer, profile) - && SchedulerUtils.isUniqueHostname(offer, - schedulerState.getActiveTasks())) { - TaskInfo task = taskFactory.createTask(offer, pendingTaskId, - taskToLaunch); - List<OfferID> offerIds = new ArrayList<>(); - offerIds.add(offer.getId()); - List<TaskInfo> tasks = new ArrayList<>(); - tasks.add(task); - LOGGER.info("Launching task: {} using offer: {}", task.getTaskId().getValue(), offer.getId()); - LOGGER.debug("Launching task: {} with profile: {} using offer: {}", task, profile, offer); - - driver.launchTasks(offerIds, tasks); - launchedTaskId = pendingTaskId; - - taskToLaunch.setHostname(offer.getHostname()); - taskToLaunch.setSlaveId(offer.getSlaveId()); - offerMatch = true; - iterator.remove(); // remove the used offer from offers list - break; - } - } - if (null != launchedTaskId) { - schedulerState.makeTaskStaging(launchedTaskId); - launchedTaskId = null; - } - if (!offerMatch) { - LOGGER.info( - "Declining offer {}, as it didn't match any pending task.", - offer); - driver.declineOffer(offer.getId()); - } - } - } ++ private TaskUtils taskUtils; + - for (Offer offer : offers) { - if (SchedulerUtils.isEligibleForFineGrainedScaling(offer.getHostname(), schedulerState)) { - if (LOGGER.isDebugEnabled()) { - LOGGER.debug("Picking an offer from slave with hostname {} for fine grained scaling.", - offer.getHostname()); - } - offerLifecycleMgr.addOffers(offer); - } else { - if (LOGGER.isDebugEnabled()) { - LOGGER.debug("Declining offer {} from slave {}.", offer, offer.getHostname()); - } - driver.declineOffer(offer.getId()); - } ++ @Inject ++ private OfferLifecycleManager offerLifecycleMgr; ++ ++ @Override ++ public void onEvent(ResourceOffersEvent event, long sequence, ++ boolean endOfBatch) throws Exception { ++ SchedulerDriver driver = event.getDriver(); ++ List<Offer> offers = event.getOffers(); ++ ++ LOGGER.info("Received offers {}", offers.size()); ++ LOGGER.debug("Pending tasks: {}", this.schedulerState.getPendingTaskIds()); ++ driverOperationLock.lock(); ++ try { ++ Set<Protos.TaskID> pendingTasks = schedulerState.getPendingTaskIds(); ++ if (CollectionUtils.isNotEmpty(pendingTasks)) { ++ for (Iterator<Offer> iterator = offers.iterator(); iterator.hasNext();) { ++ Offer offer = iterator.next(); ++ boolean offerMatch = false; ++ Protos.TaskID launchedTaskId = null; ++ for (Protos.TaskID pendingTaskId : pendingTasks) { ++ NodeTask taskToLaunch = schedulerState ++ .getTask(pendingTaskId); ++ NMProfile profile = taskToLaunch.getProfile(); ++ ++ if (matches(offer, profile) ++ && SchedulerUtils.isUniqueHostname(offer, ++ schedulerState.getActiveTasks())) { ++ TaskInfo task = taskFactory.createTask(offer, pendingTaskId, ++ taskToLaunch); ++ List<OfferID> offerIds = new ArrayList<>(); ++ offerIds.add(offer.getId()); ++ List<TaskInfo> tasks = new ArrayList<>(); ++ tasks.add(task); ++ LOGGER.info("Launching task: {} using offer: {}", task.getTaskId().getValue(), offer.getId()); ++ LOGGER.debug("Launching task: {} with profile: {} using offer: {}", task, profile, offer); ++ ++ driver.launchTasks(offerIds, tasks); ++ launchedTaskId = pendingTaskId; ++ ++ taskToLaunch.setHostname(offer.getHostname()); ++ taskToLaunch.setSlaveId(offer.getSlaveId()); ++ offerMatch = true; ++ iterator.remove(); // remove the used offer from offers list ++ break; } -- } finally { -- driverOperationLock.unlock(); ++ } ++ if (null != launchedTaskId) { ++ schedulerState.makeTaskStaging(launchedTaskId); ++ launchedTaskId = null; ++ } ++ if (!offerMatch) { ++ LOGGER.info( ++ "Declining offer {}, as it didn't match any pending task.", ++ offer); ++ driver.declineOffer(offer.getId()); ++ } } - } - - private boolean matches(Offer offer, NMProfile profile) { - Map<String, Object> results = new HashMap<String, Object>(5); - - for (Resource resource : offer.getResourcesList()) { - if (resourceEvaluators.containsKey(resource.getName())) { - resourceEvaluators.get(resource.getName()).eval(resource, results); - } else { - LOGGER.warn("Ignoring unknown resource type: {}", - resource.getName()); - } ++ } ++ ++ for (Offer offer : offers) { ++ if (SchedulerUtils.isEligibleForFineGrainedScaling(offer.getHostname(), schedulerState)) { ++ if (LOGGER.isDebugEnabled()) { ++ LOGGER.debug("Picking an offer from slave with hostname {} for fine grained scaling.", ++ offer.getHostname()); ++ } ++ offerLifecycleMgr.addOffers(offer); ++ } else { ++ if (LOGGER.isDebugEnabled()) { ++ LOGGER.debug("Declining offer {} from slave {}.", offer, offer.getHostname()); ++ } ++ driver.declineOffer(offer.getId()); + } - double cpus = (Double) results.get("cpus"); - double mem = (Double) results.get("mem"); - int ports = (Integer) results.get("ports"); - - checkResource(cpus < 0, "cpus"); - checkResource(mem < 0, "mem"); - checkResource(ports < 0, "port"); - - return checkAggregates(offer, profile, ports, cpus, mem); ++ } ++ } finally { ++ driverOperationLock.unlock(); + } - - private void checkResource(boolean fail, String resource) { - if (fail) { - LOGGER.info("No " + resource + " resources present"); - } ++ } ++ ++ private boolean matches(Offer offer, NMProfile profile) { ++ Map<String, Object> results = new HashMap<String, Object>(5); ++ ++ for (Resource resource : offer.getResourcesList()) { ++ if (resourceEvaluators.containsKey(resource.getName())) { ++ resourceEvaluators.get(resource.getName()).eval(resource, results); ++ } else { ++ LOGGER.warn("Ignoring unknown resource type: {}", ++ resource.getName()); ++ } } ++ double cpus = (Double) results.get("cpus"); ++ double mem = (Double) results.get("mem"); ++ int ports = (Integer) results.get("ports"); - private boolean matches(Offer offer, NMProfile profile) { - double cpus = -1; - double mem = -1; - - for (Resource resource : offer.getResourcesList()) { - if (resource.getName().equals("cpus")) { - if (resource.getType().equals(Value.Type.SCALAR)) { - cpus = resource.getScalar().getValue(); - } else { - LOGGER.error("Cpus resource was not a scalar: {}", resource - .getType().toString()); - } - } else if (resource.getName().equals("mem")) { - if (resource.getType().equals(Value.Type.SCALAR)) { - mem = resource.getScalar().getValue(); - } else { - LOGGER.error("Mem resource was not a scalar: {}", resource - .getType().toString()); - } - } else if (resource.getName().equals("disk")) { - LOGGER.warn("Ignoring disk resources from offer"); - } else if (resource.getName().equals("ports")) { - LOGGER.info("Ignoring ports resources from offer"); - } else { - LOGGER.warn("Ignoring unknown resource type: {}", - resource.getName()); - } - } - private boolean checkAggregates(Offer offer, NMProfile profile, int ports, double cpus, double mem) { - Map<String, String> requestAttributes = new HashMap<>(); ++ checkResource(cpus < 0, "cpus"); ++ checkResource(mem < 0, "mem"); ++ checkResource(ports < 0, "port"); - if (cpus < 0) { - LOGGER.error("No cpus resource present"); - } - if (mem < 0) { - LOGGER.error("No mem resource present"); - if (taskUtils.getAggregateCpus(profile) <= cpus - && taskUtils.getAggregateMemory(profile) <= mem - && SchedulerUtils.isMatchSlaveAttributes(offer, requestAttributes) - && NMPorts.expectedNumPorts() <= ports) { - return true; - } else { - LOGGER.info("Offer not sufficient for task with, cpu: {}, memory: {}, ports: {}", - taskUtils.getAggregateCpus(profile), taskUtils.getAggregateMemory(profile), ports); - return false; -- } - } ++ return checkAggregates(offer, profile, ports, cpus, mem); ++ } - Map<String, String> requestAttributes = new HashMap<>(); - private static Double scalarToDouble(Resource resource, String id) { - Double value = new Double(0.0); - if (resource.getType().equals(Value.Type.SCALAR)) { - value = new Double(resource.getScalar().getValue()); - } else { - LOGGER.error(id + " resource was not a scalar: {}", resource - .getType().toString()); - } - return value; ++ private void checkResource(boolean fail, String resource) { ++ if (fail) { ++ LOGGER.info("No " + resource + " resources present"); + } - - private interface EvalResources { - public void eval(Resource resource, Map<String, Object>results); ++ } ++ ++ private boolean checkAggregates(Offer offer, NMProfile profile, int ports, double cpus, double mem) { ++ Map<String, String> requestAttributes = new HashMap<>(); ++ ++ if (taskUtils.getAggregateCpus(profile) <= cpus ++ && taskUtils.getAggregateMemory(profile) <= mem ++ && SchedulerUtils.isMatchSlaveAttributes(offer, requestAttributes) ++ && NMPorts.expectedNumPorts() <= ports) { ++ return true; ++ } else { ++ LOGGER.info("Offer not sufficient for task with, cpu: {}, memory: {}, ports: {}", ++ taskUtils.getAggregateCpus(profile), taskUtils.getAggregateMemory(profile), ports); ++ return false; + } ++ } ++ ++ private static Double scalarToDouble(Resource resource, String id) { ++ Double value = new Double(0.0); ++ if (resource.getType().equals(Value.Type.SCALAR)) { ++ value = new Double(resource.getScalar().getValue()); ++ } else { ++ LOGGER.error(id + " resource was not a scalar: {}", resource ++ .getType().toString()); ++ } ++ return value; ++ } ++ ++ private interface EvalResources { ++ public void eval(Resource resource, Map<String, Object>results); ++ } ++ ++ private static Map<String, EvalResources> resourceEvaluators; ++ ++ static { ++ resourceEvaluators = new HashMap<String, EvalResources>(4); ++ resourceEvaluators.put("cpus", new EvalResources() { ++ public void eval(Resource resource, Map<String, Object> results) { ++ results.put("cpus", scalarToDouble(resource, "cpus")); ++ } ++ }); ++ resourceEvaluators.put("mem", new EvalResources() { ++ public void eval(Resource resource, Map<String, Object> results) { ++ results.put("mem", scalarToDouble(resource, "mem")); ++ } ++ }); ++ resourceEvaluators.put("disk", new EvalResources() { ++ public void eval(Resource resource, Map<String, Object> results) { ++ } ++ }); ++ resourceEvaluators.put("ports", new EvalResources() { ++ public void eval(Resource resource, Map<String, Object> results) { ++ int ports = 0; ++ if (resource.getType().equals(Value.Type.RANGES)) { ++ Value.Ranges ranges = resource.getRanges(); ++ for (Value.Range range : ranges.getRangeList()) { ++ if (range.getBegin() < range.getEnd()) { ++ ports += range.getEnd() - range.getBegin() + 1; ++ } ++ } - if (taskUtils.getAggregateCpus(profile) <= cpus - && taskUtils.getAggregateMemory(profile) <= mem - && SchedulerUtils.isMatchSlaveAttributes(offer, - requestAttributes)) { - return true; - private static Map<String, EvalResources> resourceEvaluators; + } else { - LOGGER.info("Offer not sufficient for task with, cpu: {}, memory: {}", taskUtils.getAggregateCpus(profile), taskUtils.getAggregateMemory(profile)); - return false; - } - } ++ LOGGER.error("ports resource was not Ranges: {}", resource ++ .getType().toString()); - static { - resourceEvaluators = new HashMap<String, EvalResources>(4); - resourceEvaluators.put("cpus", new EvalResources() { - public void eval(Resource resource, Map<String, Object> results) { - results.put("cpus", scalarToDouble(resource, "cpus")); - } - }); - resourceEvaluators.put("mem", new EvalResources() { - public void eval(Resource resource, Map<String, Object> results) { - results.put("mem", scalarToDouble(resource, "mem")); - } - }); - resourceEvaluators.put("disk", new EvalResources() { - public void eval(Resource resource, Map<String, Object> results) { - } - }); - resourceEvaluators.put("ports", new EvalResources() { - public void eval(Resource resource, Map<String, Object> results) { - int ports = 0; - if (resource.getType().equals(Value.Type.RANGES)) { - Value.Ranges ranges = resource.getRanges(); - for (Value.Range range : ranges.getRangeList()) { - if (range.getBegin() < range.getEnd()) { - ports += range.getEnd() - range.getBegin() + 1; - } - } - - } else { - LOGGER.error("ports resource was not Ranges: {}", resource - .getType().toString()); - - } - results.put("ports", Integer.valueOf(ports)); - } - }); - } ++ } ++ results.put("ports", Integer.valueOf(ports)); ++ } ++ }); ++ } }