Merge branch 'phase1' into issue_14

Conflicts:
        
myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/TaskFactory.java
        
myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/event/handlers/ResourceOffersEventHandler.java


Project: http://git-wip-us.apache.org/repos/asf/incubator-myriad/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-myriad/commit/c895fb8d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-myriad/tree/c895fb8d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-myriad/diff/c895fb8d

Branch: refs/heads/master
Commit: c895fb8d6dc6b0ad736d3c86b525e86170e52062
Parents: a4ceb36 7145b9b
Author: Santosh Marella <smare...@maprtech.com>
Authored: Mon Aug 10 19:06:16 2015 -0700
Committer: Santosh Marella <smare...@maprtech.com>
Committed: Mon Aug 10 19:06:16 2015 -0700

----------------------------------------------------------------------
 docs/myriad-dev.md                              |   28 +
 .../myriad-remote-distribution-configuration.md |    2 +-
 .../com/ebay/myriad/executor/NMTaskConfig.java  |   37 +
 .../ebay/myriad/executor/MyriadExecutor.java    |   18 +-
 .../java/com/ebay/myriad/scheduler/NMPorts.java |   65 +
 .../com/ebay/myriad/scheduler/TaskFactory.java  |  496 +-
 .../handlers/ResourceOffersEventHandler.java    |  302 +-
 .../webapp/css/bootstrap-default.min.css        |    5 -
 .../resources/webapp/css/bootstrap-myriad.css   | 6492 ++++++++++++++++++
 .../webapp/css/bootstrap-slate.min.css          |    7 -
 .../main/resources/webapp/css/bootstrap.min.css |    7 -
 .../src/main/resources/webapp/css/myriad.css    |   20 +-
 .../main/resources/webapp/img/favicon-16x16.png |  Bin 0 -> 1362 bytes
 .../main/resources/webapp/img/favicon-32x32.png |  Bin 0 -> 2262 bytes
 .../main/resources/webapp/img/favicon-96x96.png |  Bin 0 -> 6872 bytes
 .../src/main/resources/webapp/img/favicon.ico   |  Bin 1150 -> 1150 bytes
 .../main/resources/webapp/img/navbar_logo.png   |  Bin 0 -> 18128 bytes
 .../src/main/resources/webapp/index.html        |    2 +-
 .../webapp/js/components/NavbarComponent.js     |    8 +-
 .../webapp/public/css/bootstrap-myriad-2.css    | 6492 ++++++++++++++++++
 .../webapp/public/css/bootstrap-myriad.css      | 6492 ++++++++++++++++++
 .../main/resources/webapp/public/css/myriad.css |   20 +-
 .../webapp/public/img/favicon-16x16.png         |  Bin 0 -> 1362 bytes
 .../webapp/public/img/favicon-32x32.png         |  Bin 0 -> 2262 bytes
 .../webapp/public/img/favicon-96x96.png         |  Bin 0 -> 6872 bytes
 .../resources/webapp/public/img/favicon.ico     |  Bin 1150 -> 1150 bytes
 .../resources/webapp/public/img/navbar_logo.png |  Bin 0 -> 18128 bytes
 .../src/main/resources/webapp/public/index.html |    2 +-
 .../main/resources/webapp/public/js/myriad.js   | 2142 +++---
 shutdown.sh                                     |    6 +-
 30 files changed, 21335 insertions(+), 1308 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/c895fb8d/myriad-executor/src/main/java/com/ebay/myriad/executor/MyriadExecutor.java
----------------------------------------------------------------------
diff --cc 
myriad-executor/src/main/java/com/ebay/myriad/executor/MyriadExecutor.java
index 2d5c62f,42535c5..dc8652c
--- a/myriad-executor/src/main/java/com/ebay/myriad/executor/MyriadExecutor.java
+++ b/myriad-executor/src/main/java/com/ebay/myriad/executor/MyriadExecutor.java
@@@ -114,18 -122,9 +123,18 @@@ public class MyriadExecutor implements 
  
      @Override
      public void launchTask(final ExecutorDriver driver, final TaskInfo task) {
 +        // "task id beginning with "yarn" is a mesos task for yarn container
 +        if (task.getTaskId().getValue().startsWith("yarn")) {
 +            TaskStatus status = 
TaskStatus.newBuilder().setTaskId(task.getTaskId())
 +                .setState(TaskState.TASK_RUNNING).build();
 +            driver.sendStatusUpdate(status);
 +            return;
 +        }
 +
 +        // Launch NM as a task and return status to framework
          new Thread(new Runnable() {
              public void run() {
-                 Builder statusBuilder = TaskStatus.newBuilder()
+                 TaskStatus.Builder statusBuilder = TaskStatus.newBuilder()
                          .setTaskId(task.getTaskId());
                  try {
                      NMTaskConfig taskConfig = 
GSON.fromJson(task.getData().toStringUtf8(), NMTaskConfig.class);

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/c895fb8d/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/TaskFactory.java
----------------------------------------------------------------------
diff --cc 
myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/TaskFactory.java
index a5aeae3,0e2ce16..723d4cf
--- a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/TaskFactory.java
+++ b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/TaskFactory.java
@@@ -30,75 -30,97 +31,104 @@@ import java.util.Iterator
  import java.util.Objects;
  
  /**
-  * Factory to create TaskInfo objects.
+  * Creates Tasks based on mesos offers
   */
  public interface TaskFactory {
--    TaskInfo createTask(Offer offer, TaskID taskId, NodeTask nodeTask);
- 
-     // TODO(Santosh): This is needed because the ExecutorInfo constructed
-     // to launch NM needs to be specified to launch placeholder tasks for
-     // yarn containers (for fine grained scaling).
-     // If mesos supports just specifying the 'ExecutorId' without the full
-     // ExecutorInfo, we wouldn't need this interface method.
-     ExecutorInfo getExecutorInfoForSlave(SlaveID slave);
--
--    /**
-      * Creates TaskInfo objects to launch NMs as mesos tasks.
 -     * The Node Manager Task factory implementation
--     */
--    class NMTaskFactoryImpl implements TaskFactory {
--        public static final String EXECUTOR_NAME = "myriad_task";
--        public static final String EXECUTOR_PREFIX = "myriad_executor";
--        public static final String YARN_NODEMANAGER_OPTS_KEY = 
"YARN_NODEMANAGER_OPTS";
--        private static final String YARN_RESOURCEMANAGER_HOSTNAME = 
"yarn.resourcemanager.hostname";
--        private static final String YARN_RESOURCEMANAGER_WEBAPP_ADDRESS = 
"yarn.resourcemanager.webapp.address";
--        private static final String YARN_RESOURCEMANAGER_WEBAPP_HTTPS_ADDRESS 
= "yarn.resourcemanager.webapp.https.address";
--        private static final String YARN_HTTP_POLICY = "yarn.http.policy";
--        private static final String YARN_HTTP_POLICY_HTTPS_ONLY = 
"HTTPS_ONLY";
--
--        private static final Logger LOGGER = 
LoggerFactory.getLogger(NMTaskFactoryImpl.class);
--        private MyriadConfiguration cfg;
--        private TaskUtils taskUtils;
--
--        @Inject
--        public NMTaskFactoryImpl(MyriadConfiguration cfg, TaskUtils 
taskUtils) {
--            this.cfg = cfg;
--            this.taskUtils = taskUtils;
--        }
++  TaskInfo createTask(Offer offer, TaskID taskId, NodeTask nodeTask);
++
++  // TODO(Santosh): This is needed because the ExecutorInfo constructed
++  // to launch NM needs to be specified to launch placeholder tasks for
++  // yarn containers (for fine grained scaling).
++  // If mesos supports just specifying the 'ExecutorId' without the full
++  // ExecutorInfo, we wouldn't need this interface method.
++  ExecutorInfo getExecutorInfoForSlave(SlaveID slave);
++
++  /**
++   * Creates TaskInfo objects to launch NMs as mesos tasks.
++   */
++  class NMTaskFactoryImpl implements TaskFactory {
++    public static final String EXECUTOR_NAME = "myriad_task";
++    public static final String EXECUTOR_PREFIX = "myriad_executor";
++    public static final String YARN_NODEMANAGER_OPTS_KEY = 
"YARN_NODEMANAGER_OPTS";
++    private static final String YARN_RESOURCEMANAGER_HOSTNAME = 
"yarn.resourcemanager.hostname";
++    private static final String YARN_RESOURCEMANAGER_WEBAPP_ADDRESS = 
"yarn.resourcemanager.webapp.address";
++    private static final String YARN_RESOURCEMANAGER_WEBAPP_HTTPS_ADDRESS = 
"yarn.resourcemanager.webapp.https.address";
++    private static final String YARN_HTTP_POLICY = "yarn.http.policy";
++    private static final String YARN_HTTP_POLICY_HTTPS_ONLY = "HTTPS_ONLY";
++
++    private static final Logger LOGGER = 
LoggerFactory.getLogger(NMTaskFactoryImpl.class);
++    private MyriadConfiguration cfg;
++    private TaskUtils taskUtils;
++
++    @Inject
++    public NMTaskFactoryImpl(MyriadConfiguration cfg, TaskUtils taskUtils) {
++      this.cfg = cfg;
++      this.taskUtils = taskUtils;
++    }
  
-         private static String getFileName(String uri) {
-             int lastSlash = uri.lastIndexOf('/');
-             if (lastSlash == -1) {
-                 return uri;
-             } else {
-                 String fileName = uri.substring(lastSlash + 1);
-                 Preconditions.checkArgument(!Strings.isNullOrEmpty(fileName),
-                         "URI should not have a slash at the end");
-                 return fileName;
 -        //Utility function to get the first NMPorts.expectedNumPorts number 
of ports of an offer
 -        private static NMPorts getPorts(Offer offer) {
 -            HashSet<Long> ports = new HashSet<>();
 -            for (Resource resource : offer.getResourcesList()){
 -                if (resource.getName().equals("ports")){
++    //Utility function to get the first NMPorts.expectedNumPorts number of 
ports of an offer
++    private static NMPorts getPorts(Offer offer) {
++      HashSet<Long> ports = new HashSet<>();
++      for (Resource resource : offer.getResourcesList()){
++        if (resource.getName().equals("ports")){
+                     /*
+                     ranges.getRangeList() returns a list of ranges, each 
range specifies a begin and end only.
+                     so must loop though each range until we get all ports 
needed.  We exit each loop as soon as all
+                     ports are found so bounded by NMPorts.expectedNumPorts.
+                     */
 -                    Iterator<Value.Range> itr = 
resource.getRanges().getRangeList().iterator();
 -                    while (itr.hasNext() && ports.size() < 
NMPorts.expectedNumPorts()) {
 -                        Value.Range range = itr.next();
 -                        if (range.getBegin() <= range.getEnd()) {
 -                            long i = range.getBegin();
 -                            while (i <= range.getEnd() && ports.size() < 
NMPorts.expectedNumPorts()) {
 -                                ports.add(i);
 -                                i++;
 -                            }
 -                        }
 -                    }
 -                }
++          Iterator<Value.Range> itr = 
resource.getRanges().getRangeList().iterator();
++          while (itr.hasNext() && ports.size() < NMPorts.expectedNumPorts()) {
++            Value.Range range = itr.next();
++            if (range.getBegin() <= range.getEnd()) {
++              long i = range.getBegin();
++              while (i <= range.getEnd() && ports.size() < 
NMPorts.expectedNumPorts()) {
++                ports.add(i);
++                i++;
++              }
              }
 -
 -            Preconditions.checkState(ports.size() == 
NMPorts.expectedNumPorts(), "Not enough ports in offer");
 -            Long [] portArray = ports.toArray(new Long [ports.size()]);
 -            return new NMPorts(portArray);
++          }
          }
++      }
  
-         private String getConfigurationUrl() {
-             YarnConfiguration conf = new YarnConfiguration();
-             String httpPolicy = conf.get(YARN_HTTP_POLICY);
-             if (httpPolicy != null && 
httpPolicy.equals(YARN_HTTP_POLICY_HTTPS_ONLY)) {
-                 String address = 
conf.get(YARN_RESOURCEMANAGER_WEBAPP_HTTPS_ADDRESS);
-                 if (address == null || address.isEmpty()) {
-                     address = conf.get(YARN_RESOURCEMANAGER_HOSTNAME) + 
":8090";
-                 }
-                 return "https://"; + address + "/conf";
 -        private static String getFileName(String uri) {
 -            int lastSlash = uri.lastIndexOf('/');
 -            if (lastSlash == -1) {
 -                return uri;
--            } else {
-                 String address = 
conf.get(YARN_RESOURCEMANAGER_WEBAPP_ADDRESS);
-                 if (address == null || address.isEmpty()) {
-                     address = conf.get(YARN_RESOURCEMANAGER_HOSTNAME) + 
":8088";
-                 }
-                 return "http://"; + address + "/conf";
 -                String fileName = uri.substring(lastSlash + 1);
 -                Preconditions.checkArgument(!Strings.isNullOrEmpty(fileName),
 -                        "URI should not have a slash at the end");
 -                return fileName;
--            }
 -        }
++      Preconditions.checkState(ports.size() == NMPorts.expectedNumPorts(), 
"Not enough ports in offer");
++      Long [] portArray = ports.toArray(new Long [ports.size()]);
++      return new NMPorts(portArray);
++    }
+ 
 -        private String getConfigurationUrl() {
 -            YarnConfiguration conf = new YarnConfiguration();
 -            String httpPolicy = conf.get(YARN_HTTP_POLICY);
 -            if (httpPolicy != null && 
httpPolicy.equals(YARN_HTTP_POLICY_HTTPS_ONLY)) {
 -                String address = 
conf.get(YARN_RESOURCEMANAGER_WEBAPP_HTTPS_ADDRESS);
 -                if (address == null || address.isEmpty()) {
 -                    address = conf.get(YARN_RESOURCEMANAGER_HOSTNAME) + 
":8090";
 -                }
 -                return "https://"; + address + "/conf";
 -            } else {
 -                String address = 
conf.get(YARN_RESOURCEMANAGER_WEBAPP_ADDRESS);
 -                if (address == null || address.isEmpty()) {
 -                    address = conf.get(YARN_RESOURCEMANAGER_HOSTNAME) + 
":8088";
 -                }
 -                return "http://"; + address + "/conf";
 -            }
++    private static String getFileName(String uri) {
++      int lastSlash = uri.lastIndexOf('/');
++      if (lastSlash == -1) {
++        return uri;
++      } else {
++        String fileName = uri.substring(lastSlash + 1);
++        Preconditions.checkArgument(!Strings.isNullOrEmpty(fileName),
++            "URI should not have a slash at the end");
++        return fileName;
++      }
++    }
++
++    private String getConfigurationUrl() {
++      YarnConfiguration conf = new YarnConfiguration();
++      String httpPolicy = conf.get(YARN_HTTP_POLICY);
++      if (httpPolicy != null && 
httpPolicy.equals(YARN_HTTP_POLICY_HTTPS_ONLY)) {
++        String address = conf.get(YARN_RESOURCEMANAGER_WEBAPP_HTTPS_ADDRESS);
++        if (address == null || address.isEmpty()) {
++          address = conf.get(YARN_RESOURCEMANAGER_HOSTNAME) + ":8090";
++        }
++        return "https://"; + address + "/conf";
++      } else {
++        String address = conf.get(YARN_RESOURCEMANAGER_WEBAPP_ADDRESS);
++        if (address == null || address.isEmpty()) {
++          address = conf.get(YARN_RESOURCEMANAGER_HOSTNAME) + ":8088";
          }
++        return "http://"; + address + "/conf";
++      }
++    }
  
-         private Protos.CommandInfo getCommandInfo() {
 -        private CommandInfo getCommandInfo() {
--            MyriadExecutorConfiguration myriadExecutorConfiguration = 
cfg.getMyriadExecutorConfiguration();
--            CommandInfo.Builder commandInfo = CommandInfo.newBuilder();
--            if (myriadExecutorConfiguration.getNodeManagerUri().isPresent()) {
++    private CommandInfo getCommandInfo() {
++      MyriadExecutorConfiguration myriadExecutorConfiguration = 
cfg.getMyriadExecutorConfiguration();
++      CommandInfo.Builder commandInfo = CommandInfo.newBuilder();
++      if (myriadExecutorConfiguration.getNodeManagerUri().isPresent()) {
                  /*
                   TODO(darinj): Overall this is messier than I'd like. We 
can't let mesos untar the distribution, since
                   it will change the permissions.  Instead we simply download 
the tarball and execute tar -xvpf. We also
@@@ -106,168 -128,193 +136,194 @@@
                   frameworkSuperUser. This will be refactored after Mesos-1790 
is resolved.
                  */
  
--                //Both FrameworkUser and FrameworkSuperuser to get all of the 
directory permissions correct.
--                if (!(cfg.getFrameworkUser().isPresent() && 
cfg.getFrameworkSuperUser().isPresent())) {
--                    throw new RuntimeException("Trying to use remote 
distribution, but frameworkUser" +
--                            "and/or frameworkSuperUser not set!");
--                }
--
--                LOGGER.info("Using remote distribution");
--
--                String nmURIString = 
myriadExecutorConfiguration.getNodeManagerUri().get();
--
--                //TODO(DarinJ) support other compression, as this is a temp 
fix for Mesos 1760 may not get to it.
--                //Extract tarball keeping permissions, necessary to keep 
HADOOP_HOME/bin/container-executor suidbit set.
--                String tarCmd = "sudo tar -zxpf " + getFileName(nmURIString);
--
--                //We need the current directory to be writable by 
frameworkUser for capsuleExecutor to create directories.
--                //Best to simply give owenership to the user running the 
executor but we don't want to use -R as this
--                //will silently remove the suid bit on container executor.
--                String chownCmd = "sudo chown " + 
cfg.getFrameworkUser().get() + " .";
--
--                //Place the hadoop config where in the HADOOP_CONF_DIR where 
it will be read by the NodeManager
--                //The url for the resource manager config is: 
http(s)://hostname:port/conf so fetcher.cpp downloads the
--                //config file to conf, It's an xml file with the parameters 
of yarn-site.xml, core-site.xml and hdfs.xml.
--                String configCopyCmd = "cp conf " + 
cfg.getYarnEnvironment().get("YARN_HOME") +
--                        "/etc/hadoop/yarn-site.xml";
--
--                //Command to run the executor
--                String executorPathString = 
myriadExecutorConfiguration.getPath();
--                String executorCmd = "export CAPSULE_CACHE_DIR=`pwd`;echo 
$CAPSULE_CACHE_DIR; " +
--                        "sudo -E -u " + cfg.getFrameworkUser().get() + " -H " 
+
--                        "java -Dcapsule.log=verbose -jar " + 
getFileName(executorPathString);
--
--                //Concatenate all the subcommands
--                String cmd = tarCmd + "&&" + chownCmd + "&&" + configCopyCmd 
+ "&&" + executorCmd;
--
--                //get the nodemanagerURI
--                //We're going to extract ourselves, so setExtract is false
--                LOGGER.info("Getting Hadoop distribution from:" + 
nmURIString);
--                URI nmUri = 
URI.newBuilder().setValue(nmURIString).setExtract(false)
--                        .build();
--
--                //get configs directly from resource manager
--                String configUrlString = getConfigurationUrl();
--                LOGGER.info("Getting config from:" + configUrlString);
--                URI configUri = URI.newBuilder().setValue(configUrlString)
--                        .build();
--
--                //get the executor URI
--                LOGGER.info("Getting executor from:" + executorPathString);
--                URI executorUri = 
URI.newBuilder().setValue(executorPathString).setExecutable(true)
--                        .build();
--
--                LOGGER.info("Slave will execute command:" + cmd);
--                
commandInfo.addUris(nmUri).addUris(configUri).addUris(executorUri).setValue("echo
 \"" + cmd + "\";" + cmd);
--
--                commandInfo.setUser(cfg.getFrameworkSuperUser().get());
--
--            } else {
--                String cmdPrefix = "export CAPSULE_CACHE_DIR=`pwd` ;" +
--                        "echo $CAPSULE_CACHE_DIR; java -Dcapsule.log=verbose 
-jar ";
--                String executorPath = myriadExecutorConfiguration.getPath();
--                String cmd = cmdPrefix + getFileName(executorPath);
--                URI executorURI = URI.newBuilder().setValue(executorPath)
--                        .setExecutable(true).build();
--                commandInfo.addUris(executorURI)
--                        .setValue("echo \"" + cmd + "\";" + cmd);
--
--                if (cfg.getFrameworkUser().isPresent()) {
--                    commandInfo.setUser(cfg.getFrameworkUser().get());
--                }
--            }
--            return commandInfo.build();
++        //Both FrameworkUser and FrameworkSuperuser to get all of the 
directory permissions correct.
++        if (!(cfg.getFrameworkUser().isPresent() && 
cfg.getFrameworkSuperUser().isPresent())) {
++          throw new RuntimeException("Trying to use remote distribution, but 
frameworkUser" +
++              "and/or frameworkSuperUser not set!");
          }
  
--        @Override
--        public TaskInfo createTask(Offer offer, TaskID taskId, NodeTask 
nodeTask) {
--            Objects.requireNonNull(offer, "Offer should be non-null");
--            Objects.requireNonNull(nodeTask, "NodeTask should be non-null");
 -
 -            NMPorts ports = getPorts(offer);
 -            LOGGER.debug(ports.toString());
--
--            NMProfile profile = nodeTask.getProfile();
--            NMTaskConfig nmTaskConfig = new NMTaskConfig();
--            nmTaskConfig.setAdvertisableCpus(profile.getCpus());
--            nmTaskConfig.setAdvertisableMem(profile.getMemory());
--            NodeManagerConfiguration nodeManagerConfiguration = 
this.cfg.getNodeManagerConfiguration();
--            
nmTaskConfig.setJvmOpts(nodeManagerConfiguration.getJvmOpts().orNull());
--            
nmTaskConfig.setCgroups(nodeManagerConfiguration.getCgroups().or(Boolean.FALSE));
 -            nmTaskConfig.setRpcPort(ports.getRpcPort());
 -            nmTaskConfig.setLocalizerPort(ports.getLocalizerPort());
 -            nmTaskConfig.setWebAppHttpPort(ports.getWebAppHttpPort());
 -            nmTaskConfig.setShufflePort(ports.getShufflePort());
--            nmTaskConfig.setYarnEnvironment(cfg.getYarnEnvironment());
--
--            // if RM's hostname is passed in as a system property, pass it 
along
--            // to Node Managers launched via Myriad
--            String rmHostName = 
System.getProperty(YARN_RESOURCEMANAGER_HOSTNAME);
--            if (rmHostName != null && !rmHostName.isEmpty()) {
--
--                String nmOpts = 
nmTaskConfig.getYarnEnvironment().get(YARN_NODEMANAGER_OPTS_KEY);
--                if (nmOpts == null) {
--                    nmOpts = "";
--                }
--                nmOpts += " " + "-D" + YARN_RESOURCEMANAGER_HOSTNAME + "=" + 
rmHostName;
--                
nmTaskConfig.getYarnEnvironment().put(YARN_NODEMANAGER_OPTS_KEY, nmOpts);
--                LOGGER.info(YARN_RESOURCEMANAGER_HOSTNAME + " is set to " + 
rmHostName +
--                        " via YARN_RESOURCEMANAGER_OPTS. Passing it into 
YARN_NODEMANAGER_OPTS.");
--            }
--//            else {
--            // TODO(Santosh): Handle this case. Couple of options:
--            // 1. Lookup a hostname here and use it as "RM's hostname"
--            // 2. Abort here.. RM cannot start unless a hostname is passed in 
as it requires it to pass to NMs.
--
--            String taskConfigJSON = new Gson().toJson(nmTaskConfig);
--
-             Scalar taskMemory = Value.Scalar.newBuilder()
 -            Scalar taskMemory = Scalar.newBuilder()
--                    .setValue(taskUtils.getTaskMemory(profile))
--                    .build();
-             Scalar taskCpus = Value.Scalar.newBuilder()
 -            Scalar taskCpus = Scalar.newBuilder()
--                    .setValue(taskUtils.getTaskCpus(profile))
--                    .build();
-             ExecutorInfo executorInfo = 
getExecutorInfoForSlave(offer.getSlaveId());
 -            Scalar executorMemory = Scalar.newBuilder()
 -                    .setValue(taskUtils.getExecutorMemory())
 -                    .build();
 -            Scalar executorCpus = Scalar.newBuilder()
 -                    .setValue(taskUtils.getExecutorCpus())
 -                    .build();
 -
 -            CommandInfo commandInfo = getCommandInfo();
 -
 -            ExecutorID executorId = ExecutorID.newBuilder()
 -                    .setValue(EXECUTOR_PREFIX + offer.getSlaveId().getValue())
 -                    .build();
 -            ExecutorInfo executorInfo = ExecutorInfo
 -                    .newBuilder()
 -                    .setCommand(commandInfo)
 -                    .setName(EXECUTOR_NAME)
 -                    .addResources(
 -                            Resource.newBuilder().setName("cpus")
 -                                    .setType(Value.Type.SCALAR)
 -                                    .setScalar(executorCpus)
 -                                    .build())
 -                    .addResources(
 -                            Resource.newBuilder().setName("mem")
 -                                    .setType(Value.Type.SCALAR)
 -                                    .setScalar(executorMemory)
 -                                    .build())
 -                    
.setExecutorId(executorId).setCommand(commandInfo).build();
--
--            TaskInfo.Builder taskBuilder = TaskInfo.newBuilder()
--                    .setName("task-" + taskId.getValue())
--                    .setTaskId(taskId)
--                    .setSlaveId(offer.getSlaveId());
--
-             // TODO (mohit): Configure ports for multi-tenancy
--            ByteString data = 
ByteString.copyFrom(taskConfigJSON.getBytes(Charset.defaultCharset()));
--            return taskBuilder
--                    .addResources(
--                            Resource.newBuilder().setName("cpus")
--                                    .setType(Value.Type.SCALAR)
--                                    .setScalar(taskCpus)
--                                    .build())
--                    .addResources(
--                            Resource.newBuilder().setName("mem")
--                                    .setType(Value.Type.SCALAR)
--                                    .setScalar(taskMemory)
--                                    .build())
 -                    .addResources(
 -                            Resource.newBuilder().setName("ports")
 -                                    .setType(Value.Type.RANGES)
 -                                    .setRanges(Value.Ranges.newBuilder()
 -                                            .addRange(Value.Range.newBuilder()
 -                                                    
.setBegin(ports.getRpcPort())
 -                                                    
.setEnd(ports.getRpcPort())
 -                                                    .build())
 -                                            .addRange(Value.Range.newBuilder()
 -                                                    
.setBegin(ports.getLocalizerPort())
 -                                                    
.setEnd(ports.getLocalizerPort())
 -                                                    .build())
 -                                            .addRange(Value.Range.newBuilder()
 -                                                    
.setBegin(ports.getWebAppHttpPort())
 -                                                    
.setEnd(ports.getWebAppHttpPort())
 -                                                    .build())
 -                                            .addRange(Value.Range.newBuilder()
 -                                                    
.setBegin(ports.getShufflePort())
 -                                                    
.setEnd(ports.getShufflePort())
 -                                                    .build())))
--                    .setExecutor(executorInfo).setData(data).build();
++        LOGGER.info("Using remote distribution");
++
++        String nmURIString = 
myriadExecutorConfiguration.getNodeManagerUri().get();
++
++        //TODO(DarinJ) support other compression, as this is a temp fix for 
Mesos 1760 may not get to it.
++        //Extract tarball keeping permissions, necessary to keep 
HADOOP_HOME/bin/container-executor suidbit set.
++        String tarCmd = "sudo tar -zxpf " + getFileName(nmURIString);
++
++        //We need the current directory to be writable by frameworkUser for 
capsuleExecutor to create directories.
++        //Best to simply give owenership to the user running the executor but 
we don't want to use -R as this
++        //will silently remove the suid bit on container executor.
++        String chownCmd = "sudo chown " + cfg.getFrameworkUser().get() + " .";
++
++        //Place the hadoop config where in the HADOOP_CONF_DIR where it will 
be read by the NodeManager
++        //The url for the resource manager config is: 
http(s)://hostname:port/conf so fetcher.cpp downloads the
++        //config file to conf, It's an xml file with the parameters of 
yarn-site.xml, core-site.xml and hdfs.xml.
++        String configCopyCmd = "cp conf " + 
cfg.getYarnEnvironment().get("YARN_HOME") +
++            "/etc/hadoop/yarn-site.xml";
++
++        //Command to run the executor
++        String executorPathString = myriadExecutorConfiguration.getPath();
++        String executorCmd = "export CAPSULE_CACHE_DIR=`pwd`;echo 
$CAPSULE_CACHE_DIR; " +
++            "sudo -E -u " + cfg.getFrameworkUser().get() + " -H " +
++            "java -Dcapsule.log=verbose -jar " + 
getFileName(executorPathString);
++
++        //Concatenate all the subcommands
++        String cmd = tarCmd + "&&" + chownCmd + "&&" + configCopyCmd + "&&" + 
executorCmd;
++
++        //get the nodemanagerURI
++        //We're going to extract ourselves, so setExtract is false
++        LOGGER.info("Getting Hadoop distribution from:" + nmURIString);
++        URI nmUri = URI.newBuilder().setValue(nmURIString).setExtract(false)
++            .build();
++
++        //get configs directly from resource manager
++        String configUrlString = getConfigurationUrl();
++        LOGGER.info("Getting config from:" + configUrlString);
++        URI configUri = URI.newBuilder().setValue(configUrlString)
++            .build();
++
++        //get the executor URI
++        LOGGER.info("Getting executor from:" + executorPathString);
++        URI executorUri = 
URI.newBuilder().setValue(executorPathString).setExecutable(true)
++            .build();
++
++        LOGGER.info("Slave will execute command:" + cmd);
++        
commandInfo.addUris(nmUri).addUris(configUri).addUris(executorUri).setValue("echo
 \"" + cmd + "\";" + cmd);
++
++        commandInfo.setUser(cfg.getFrameworkSuperUser().get());
++
++      } else {
++        String cmdPrefix = "export CAPSULE_CACHE_DIR=`pwd` ;" +
++            "echo $CAPSULE_CACHE_DIR; java -Dcapsule.log=verbose -jar ";
++        String executorPath = myriadExecutorConfiguration.getPath();
++        String cmd = cmdPrefix + getFileName(executorPath);
++        URI executorURI = URI.newBuilder().setValue(executorPath)
++            .setExecutable(true).build();
++        commandInfo.addUris(executorURI)
++            .setValue("echo \"" + cmd + "\";" + cmd);
++
++        if (cfg.getFrameworkUser().isPresent()) {
++          commandInfo.setUser(cfg.getFrameworkUser().get());
 +        }
++      }
++      return commandInfo.build();
++    }
 +
-         @Override
-         public ExecutorInfo getExecutorInfoForSlave(SlaveID slave) {
-             Scalar executorMemory = Scalar.newBuilder()
-                 .setValue(taskUtils.getExecutorMemory()).build();
-             Scalar executorCpus = Scalar.newBuilder()
-                 .setValue(taskUtils.getExecutorCpus()).build();
- 
-             CommandInfo commandInfo = getCommandInfo();
- 
-             ExecutorID executorId = ExecutorID.newBuilder()
-                 .setValue(EXECUTOR_PREFIX + slave.getValue())
-                 .build();
-             return ExecutorInfo
-                 .newBuilder()
-                 .setCommand(commandInfo)
-                 .setName(EXECUTOR_NAME)
-                 .addResources(
-                     Resource.newBuilder().setName("cpus")
-                         .setType(Value.Type.SCALAR)
-                         .setScalar(executorCpus).build())
-                 .addResources(
-                     Resource.newBuilder().setName("mem")
-                         .setType(Value.Type.SCALAR)
-                         .setScalar(executorMemory).build())
-                 .setExecutorId(executorId).build();
++    @Override
++    public TaskInfo createTask(Offer offer, TaskID taskId, NodeTask nodeTask) 
{
++      Objects.requireNonNull(offer, "Offer should be non-null");
++      Objects.requireNonNull(nodeTask, "NodeTask should be non-null");
++
++      NMPorts ports = getPorts(offer);
++      LOGGER.debug(ports.toString());
++
++      NMProfile profile = nodeTask.getProfile();
++      NMTaskConfig nmTaskConfig = new NMTaskConfig();
++      nmTaskConfig.setAdvertisableCpus(profile.getCpus());
++      nmTaskConfig.setAdvertisableMem(profile.getMemory());
++      NodeManagerConfiguration nodeManagerConfiguration = 
this.cfg.getNodeManagerConfiguration();
++      nmTaskConfig.setJvmOpts(nodeManagerConfiguration.getJvmOpts().orNull());
++      
nmTaskConfig.setCgroups(nodeManagerConfiguration.getCgroups().or(Boolean.FALSE));
++      nmTaskConfig.setRpcPort(ports.getRpcPort());
++      nmTaskConfig.setLocalizerPort(ports.getLocalizerPort());
++      nmTaskConfig.setWebAppHttpPort(ports.getWebAppHttpPort());
++      nmTaskConfig.setShufflePort(ports.getShufflePort());
++      nmTaskConfig.setYarnEnvironment(cfg.getYarnEnvironment());
++
++      // if RM's hostname is passed in as a system property, pass it along
++      // to Node Managers launched via Myriad
++      String rmHostName = System.getProperty(YARN_RESOURCEMANAGER_HOSTNAME);
++      if (rmHostName != null && !rmHostName.isEmpty()) {
++
++        String nmOpts = 
nmTaskConfig.getYarnEnvironment().get(YARN_NODEMANAGER_OPTS_KEY);
++        if (nmOpts == null) {
++          nmOpts = "";
          }
++        nmOpts += " " + "-D" + YARN_RESOURCEMANAGER_HOSTNAME + "=" + 
rmHostName;
++        nmTaskConfig.getYarnEnvironment().put(YARN_NODEMANAGER_OPTS_KEY, 
nmOpts);
++        LOGGER.info(YARN_RESOURCEMANAGER_HOSTNAME + " is set to " + 
rmHostName +
++            " via YARN_RESOURCEMANAGER_OPTS. Passing it into 
YARN_NODEMANAGER_OPTS.");
++      }
++//            else {
++      // TODO(Santosh): Handle this case. Couple of options:
++      // 1. Lookup a hostname here and use it as "RM's hostname"
++      // 2. Abort here.. RM cannot start unless a hostname is passed in as it 
requires it to pass to NMs.
++
++      String taskConfigJSON = new Gson().toJson(nmTaskConfig);
++
++      Scalar taskMemory = Scalar.newBuilder()
++          .setValue(taskUtils.getTaskMemory(profile))
++          .build();
++      Scalar taskCpus = Scalar.newBuilder()
++          .setValue(taskUtils.getTaskCpus(profile))
++          .build();
++      ExecutorInfo executorInfo = getExecutorInfoForSlave(offer.getSlaveId());
++
++      TaskInfo.Builder taskBuilder = TaskInfo.newBuilder()
++          .setName("task-" + taskId.getValue())
++          .setTaskId(taskId)
++          .setSlaveId(offer.getSlaveId());
++
++      ByteString data = 
ByteString.copyFrom(taskConfigJSON.getBytes(Charset.defaultCharset()));
++      return taskBuilder
++          .addResources(
++              Resource.newBuilder().setName("cpus")
++                  .setType(Value.Type.SCALAR)
++                  .setScalar(taskCpus)
++                  .build())
++          .addResources(
++              Resource.newBuilder().setName("mem")
++                  .setType(Value.Type.SCALAR)
++                  .setScalar(taskMemory)
++                  .build())
++          .addResources(
++              Resource.newBuilder().setName("ports")
++                  .setType(Value.Type.RANGES)
++                  .setRanges(Value.Ranges.newBuilder()
++                      .addRange(Value.Range.newBuilder()
++                          .setBegin(ports.getRpcPort())
++                          .setEnd(ports.getRpcPort())
++                          .build())
++                      .addRange(Value.Range.newBuilder()
++                          .setBegin(ports.getLocalizerPort())
++                          .setEnd(ports.getLocalizerPort())
++                          .build())
++                      .addRange(Value.Range.newBuilder()
++                          .setBegin(ports.getWebAppHttpPort())
++                          .setEnd(ports.getWebAppHttpPort())
++                          .build())
++                      .addRange(Value.Range.newBuilder()
++                          .setBegin(ports.getShufflePort())
++                          .setEnd(ports.getShufflePort())
++                          .build())))
++          .setExecutor(executorInfo).setData(data).build();
++    }
++
++    @Override
++    public ExecutorInfo getExecutorInfoForSlave(SlaveID slave) {
++      Scalar executorMemory = Scalar.newBuilder()
++          .setValue(taskUtils.getExecutorMemory()).build();
++      Scalar executorCpus = Scalar.newBuilder()
++          .setValue(taskUtils.getExecutorCpus()).build();
++
++      CommandInfo commandInfo = getCommandInfo();
++
++      ExecutorID executorId = ExecutorID.newBuilder()
++          .setValue(EXECUTOR_PREFIX + slave.getValue())
++          .build();
++      return ExecutorInfo
++          .newBuilder()
++          .setCommand(commandInfo)
++          .setName(EXECUTOR_NAME)
++          .addResources(
++              Resource.newBuilder().setName("cpus")
++                  .setType(Value.Type.SCALAR)
++                  .setScalar(executorCpus).build())
++          .addResources(
++              Resource.newBuilder().setName("mem")
++                  .setType(Value.Type.SCALAR)
++                  .setScalar(executorMemory).build())
++          .setExecutorId(executorId).build();
      }
++  }
  }

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/c895fb8d/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/event/handlers/ResourceOffersEventHandler.java
----------------------------------------------------------------------
diff --cc 
myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/event/handlers/ResourceOffersEventHandler.java
index 554f966,c009ce8..fbe93d6
--- 
a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/event/handlers/ResourceOffersEventHandler.java
+++ 
b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/event/handlers/ResourceOffersEventHandler.java
@@@ -15,9 -15,12 +15,13 @@@
   */
  package com.ebay.myriad.scheduler.event.handlers;
  
- import com.ebay.myriad.scheduler.*;
+ import com.ebay.myriad.scheduler.NMPorts;
+ import com.ebay.myriad.scheduler.NMProfile;
+ import com.ebay.myriad.scheduler.SchedulerUtils;
+ import com.ebay.myriad.scheduler.TaskFactory;
+ import com.ebay.myriad.scheduler.TaskUtils;
  import com.ebay.myriad.scheduler.event.ResourceOffersEvent;
 +import com.ebay.myriad.scheduler.fgs.OfferLifecycleManager;
  import com.ebay.myriad.state.NodeTask;
  import com.ebay.myriad.state.SchedulerState;
  import com.lmax.disruptor.EventHandler;
@@@ -48,143 -48,176 +52,191 @@@ import java.util.concurrent.locks.Reent
   * handles and logs resource offers events
   */
  public class ResourceOffersEventHandler implements 
EventHandler<ResourceOffersEvent> {
--    private static final Logger LOGGER = 
LoggerFactory.getLogger(ResourceOffersEventHandler.class);
 -
 -    private static final Lock driverOperationLock = new ReentrantLock();
 -
 -    @Inject
 -    private SchedulerState schedulerState;
 -
 -    @Inject
 -    private TaskFactory taskFactory;
 -
 -    @Inject
 -    private TaskUtils taskUtils;
 -
 -    @Override
 -    public void onEvent(ResourceOffersEvent event, long sequence,
 -                        boolean endOfBatch) throws Exception {
 -        SchedulerDriver driver = event.getDriver();
 -        List<Offer> offers = event.getOffers();
 -
 -        LOGGER.info("Received offers {}", offers.size());
 -        LOGGER.debug("Pending tasks: {}", 
this.schedulerState.getPendingTaskIds());
 -        driverOperationLock.lock();
 -        try {
 -            Set<Protos.TaskID> pendingTasks = 
schedulerState.getPendingTaskIds();
 -            if (CollectionUtils.isNotEmpty(pendingTasks)) {
 -                for (Offer offer : offers) {
 -                    boolean offerMatch = false;
 -                    Protos.TaskID launchedTaskId = null;
 -                    for (Protos.TaskID pendingTaskId : pendingTasks) {
 -                        NodeTask taskToLaunch = schedulerState
 -                                .getTask(pendingTaskId);
 -                        NMProfile profile = taskToLaunch.getProfile();
 -
 -                        if (matches(offer, profile)
 -                                && SchedulerUtils.isUniqueHostname(offer,
 -                                schedulerState.getActiveTasks())) {
 -                            TaskInfo task = taskFactory.createTask(offer, 
pendingTaskId,
 -                                    taskToLaunch);
 -                            List<OfferID> offerIds = new ArrayList<>();
 -                            offerIds.add(offer.getId());
 -                            List<TaskInfo> tasks = new ArrayList<>();
 -                            tasks.add(task);
 -                            LOGGER.info("Launching task: {} using offer: {}", 
task.getTaskId().getValue(), offer.getId());
 -                            LOGGER.debug("Launching task: {} with profile: {} 
using offer: {}", task, profile, offer);
 -
 -                            driver.launchTasks(offerIds, tasks);
 -                            launchedTaskId = pendingTaskId;
 -
 -                            taskToLaunch.setHostname(offer.getHostname());
 -                            taskToLaunch.setSlaveId(offer.getSlaveId());
 -                            offerMatch = true;
 -                            break;
 -                        }
 -                    }
 -                    if (null != launchedTaskId) {
 -                        schedulerState.makeTaskStaging(launchedTaskId);
 -                        launchedTaskId = null;
 -                    }
 -                    if (!offerMatch) {
 -                        LOGGER.info(
 -                                "Declining offer {}, as it didn't match any 
pending task.",
 -                                offer);
 -                        driver.declineOffer(offer.getId());
 -                    }
 -                }
 -            } else {
 -                LOGGER.info("No pending tasks, declining all offers");
 -                for (Offer offer : offers) {
 -                    driver.declineOffer(offer.getId());
 -                }
++  private static final Logger LOGGER = 
LoggerFactory.getLogger(ResourceOffersEventHandler.class);
 +
-     private static final Lock driverOperationLock = new ReentrantLock();
++  private static final Lock driverOperationLock = new ReentrantLock();
 +
-     @Inject
-     private SchedulerState schedulerState;
- 
-     @Inject
-     private TaskFactory taskFactory;
++  @Inject
++  private SchedulerState schedulerState;
 +
-     @Inject
-     private TaskUtils taskUtils;
++  @Inject
++  private TaskFactory taskFactory;
 +
 +  @Inject
-     private OfferLifecycleManager offerLifecycleMgr;
- 
-     @Override
-     public void onEvent(ResourceOffersEvent event, long sequence,
-                         boolean endOfBatch) throws Exception {
-         SchedulerDriver driver = event.getDriver();
-         List<Offer> offers = event.getOffers();
- 
-         LOGGER.info("Received offers {}", offers.size());
-         LOGGER.debug("Pending tasks: {}", 
this.schedulerState.getPendingTaskIds());
-         driverOperationLock.lock();
-         try {
-             Set<Protos.TaskID> pendingTasks = 
schedulerState.getPendingTaskIds();
-             if (CollectionUtils.isNotEmpty(pendingTasks)) {
-                 for (Iterator<Offer> iterator = offers.iterator(); 
iterator.hasNext();) {
-                     Offer offer = iterator.next();
-                     boolean offerMatch = false;
-                     Protos.TaskID launchedTaskId = null;
-                     for (Protos.TaskID pendingTaskId : pendingTasks) {
-                         NodeTask taskToLaunch = schedulerState
-                                 .getTask(pendingTaskId);
-                         NMProfile profile = taskToLaunch.getProfile();
-                         if (matches(offer, profile)
-                                 && SchedulerUtils.isUniqueHostname(offer,
-                                 schedulerState.getActiveTasks())) {
-                             TaskInfo task = taskFactory.createTask(offer, 
pendingTaskId,
-                                     taskToLaunch);
-                             List<OfferID> offerIds = new ArrayList<>();
-                             offerIds.add(offer.getId());
-                             List<TaskInfo> tasks = new ArrayList<>();
-                             tasks.add(task);
-                             LOGGER.info("Launching task: {} using offer: {}", 
task.getTaskId().getValue(), offer.getId());
-                             LOGGER.debug("Launching task: {} with profile: {} 
using offer: {}", task, profile, offer);
- 
-                             driver.launchTasks(offerIds, tasks);
-                             launchedTaskId = pendingTaskId;
- 
-                             taskToLaunch.setHostname(offer.getHostname());
-                             taskToLaunch.setSlaveId(offer.getSlaveId());
-                             offerMatch = true;
-                             iterator.remove(); // remove the used offer from 
offers list
-                             break;
-                         }
-                     }
-                     if (null != launchedTaskId) {
-                         schedulerState.makeTaskStaging(launchedTaskId);
-                         launchedTaskId = null;
-                     }
-                     if (!offerMatch) {
-                         LOGGER.info(
-                                 "Declining offer {}, as it didn't match any 
pending task.",
-                                 offer);
-                         driver.declineOffer(offer.getId());
-                     }
-                 }
-             }
++  private TaskUtils taskUtils;
 +
-             for (Offer offer : offers) {
-               if 
(SchedulerUtils.isEligibleForFineGrainedScaling(offer.getHostname(), 
schedulerState)) {
-                 if (LOGGER.isDebugEnabled()) {
-                   LOGGER.debug("Picking an offer from slave with hostname {} 
for fine grained scaling.",
-                       offer.getHostname());
-                 }
-                 offerLifecycleMgr.addOffers(offer);
-               } else {
-                 if (LOGGER.isDebugEnabled()) {
-                   LOGGER.debug("Declining offer {} from slave {}.", offer, 
offer.getHostname());
-                 }
-                 driver.declineOffer(offer.getId());
-               }
++  @Inject
++  private OfferLifecycleManager offerLifecycleMgr;
++
++  @Override
++  public void onEvent(ResourceOffersEvent event, long sequence,
++                      boolean endOfBatch) throws Exception {
++    SchedulerDriver driver = event.getDriver();
++    List<Offer> offers = event.getOffers();
++
++    LOGGER.info("Received offers {}", offers.size());
++    LOGGER.debug("Pending tasks: {}", 
this.schedulerState.getPendingTaskIds());
++    driverOperationLock.lock();
++    try {
++      Set<Protos.TaskID> pendingTasks = schedulerState.getPendingTaskIds();
++      if (CollectionUtils.isNotEmpty(pendingTasks)) {
++        for (Iterator<Offer> iterator = offers.iterator(); 
iterator.hasNext();) {
++          Offer offer = iterator.next();
++          boolean offerMatch = false;
++          Protos.TaskID launchedTaskId = null;
++          for (Protos.TaskID pendingTaskId : pendingTasks) {
++            NodeTask taskToLaunch = schedulerState
++                .getTask(pendingTaskId);
++            NMProfile profile = taskToLaunch.getProfile();
++
++            if (matches(offer, profile)
++                && SchedulerUtils.isUniqueHostname(offer,
++                schedulerState.getActiveTasks())) {
++              TaskInfo task = taskFactory.createTask(offer, pendingTaskId,
++                  taskToLaunch);
++              List<OfferID> offerIds = new ArrayList<>();
++              offerIds.add(offer.getId());
++              List<TaskInfo> tasks = new ArrayList<>();
++              tasks.add(task);
++              LOGGER.info("Launching task: {} using offer: {}", 
task.getTaskId().getValue(), offer.getId());
++              LOGGER.debug("Launching task: {} with profile: {} using offer: 
{}", task, profile, offer);
++
++              driver.launchTasks(offerIds, tasks);
++              launchedTaskId = pendingTaskId;
++
++              taskToLaunch.setHostname(offer.getHostname());
++              taskToLaunch.setSlaveId(offer.getSlaveId());
++              offerMatch = true;
++              iterator.remove(); // remove the used offer from offers list
++              break;
              }
--        } finally {
--            driverOperationLock.unlock();
++          }
++          if (null != launchedTaskId) {
++            schedulerState.makeTaskStaging(launchedTaskId);
++            launchedTaskId = null;
++          }
++          if (!offerMatch) {
++            LOGGER.info(
++                "Declining offer {}, as it didn't match any pending task.",
++                offer);
++            driver.declineOffer(offer.getId());
++          }
          }
 -    }
 -
 -    private boolean matches(Offer offer, NMProfile profile) {
 -        Map<String, Object> results = new HashMap<String, Object>(5);
 -
 -        for (Resource resource : offer.getResourcesList()) {
 -            if (resourceEvaluators.containsKey(resource.getName())) {
 -                resourceEvaluators.get(resource.getName()).eval(resource, 
results);
 -            } else {
 -                LOGGER.warn("Ignoring unknown resource type: {}",
 -                        resource.getName());
 -            }
++      }
++
++      for (Offer offer : offers) {
++        if 
(SchedulerUtils.isEligibleForFineGrainedScaling(offer.getHostname(), 
schedulerState)) {
++          if (LOGGER.isDebugEnabled()) {
++            LOGGER.debug("Picking an offer from slave with hostname {} for 
fine grained scaling.",
++                offer.getHostname());
++          }
++          offerLifecycleMgr.addOffers(offer);
++        } else {
++          if (LOGGER.isDebugEnabled()) {
++            LOGGER.debug("Declining offer {} from slave {}.", offer, 
offer.getHostname());
++          }
++          driver.declineOffer(offer.getId());
+         }
 -        double cpus = (Double) results.get("cpus");
 -        double mem = (Double) results.get("mem");
 -        int ports = (Integer) results.get("ports");
 -
 -        checkResource(cpus < 0, "cpus");
 -        checkResource(mem < 0, "mem");
 -        checkResource(ports < 0, "port");
 -
 -        return checkAggregates(offer, profile, ports, cpus, mem);
++      }
++    } finally {
++      driverOperationLock.unlock();
+     }
 -
 -    private void checkResource(boolean fail, String resource) {
 -        if (fail) {
 -            LOGGER.info("No " + resource + " resources present");
 -        }
++  }
++
++  private boolean matches(Offer offer, NMProfile profile) {
++    Map<String, Object> results = new HashMap<String, Object>(5);
++
++    for (Resource resource : offer.getResourcesList()) {
++      if (resourceEvaluators.containsKey(resource.getName())) {
++        resourceEvaluators.get(resource.getName()).eval(resource, results);
++      } else {
++        LOGGER.warn("Ignoring unknown resource type: {}",
++            resource.getName());
++      }
      }
++    double cpus = (Double) results.get("cpus");
++    double mem = (Double) results.get("mem");
++    int ports = (Integer) results.get("ports");
  
-     private boolean matches(Offer offer, NMProfile profile) {
-         double cpus = -1;
-         double mem = -1;
- 
-         for (Resource resource : offer.getResourcesList()) {
-             if (resource.getName().equals("cpus")) {
-                 if (resource.getType().equals(Value.Type.SCALAR)) {
-                     cpus = resource.getScalar().getValue();
-                 } else {
-                     LOGGER.error("Cpus resource was not a scalar: {}", 
resource
-                             .getType().toString());
-                 }
-             } else if (resource.getName().equals("mem")) {
-                 if (resource.getType().equals(Value.Type.SCALAR)) {
-                     mem = resource.getScalar().getValue();
-                 } else {
-                     LOGGER.error("Mem resource was not a scalar: {}", resource
-                             .getType().toString());
-                 }
-             } else if (resource.getName().equals("disk")) {
-                 LOGGER.warn("Ignoring disk resources from offer");
-             } else if (resource.getName().equals("ports")) {
-                 LOGGER.info("Ignoring ports resources from offer");
-             } else {
-                 LOGGER.warn("Ignoring unknown resource type: {}",
-                         resource.getName());
-             }
-         }
 -    private boolean checkAggregates(Offer offer, NMProfile profile, int 
ports, double cpus, double mem) {
 -        Map<String, String> requestAttributes = new HashMap<>();
++    checkResource(cpus < 0, "cpus");
++    checkResource(mem < 0, "mem");
++    checkResource(ports < 0, "port");
  
-         if (cpus < 0) {
-             LOGGER.error("No cpus resource present");
-         }
-         if (mem < 0) {
-             LOGGER.error("No mem resource present");
 -        if (taskUtils.getAggregateCpus(profile) <= cpus
 -                && taskUtils.getAggregateMemory(profile) <= mem
 -                && SchedulerUtils.isMatchSlaveAttributes(offer, 
requestAttributes)
 -                && NMPorts.expectedNumPorts() <= ports) {
 -            return true;
 -        } else {
 -            LOGGER.info("Offer not sufficient for task with, cpu: {}, memory: 
{}, ports: {}",
 -                    taskUtils.getAggregateCpus(profile), 
taskUtils.getAggregateMemory(profile), ports);
 -            return false;
--        }
 -    }
++    return checkAggregates(offer, profile, ports, cpus, mem);
++  }
  
-         Map<String, String> requestAttributes = new HashMap<>();
 -    private static Double scalarToDouble(Resource resource, String id) {
 -        Double value = new Double(0.0);
 -        if (resource.getType().equals(Value.Type.SCALAR)) {
 -            value = new Double(resource.getScalar().getValue());
 -        } else {
 -            LOGGER.error(id + " resource was not a scalar: {}", resource
 -                    .getType().toString());
 -        }
 -        return value;
++  private void checkResource(boolean fail, String resource) {
++    if (fail) {
++      LOGGER.info("No " + resource + " resources present");
+     }
 -
 -    private interface EvalResources {
 -        public void eval(Resource resource, Map<String, Object>results);
++  }
++
++  private boolean checkAggregates(Offer offer, NMProfile profile, int ports, 
double cpus, double mem) {
++    Map<String, String> requestAttributes = new HashMap<>();
++
++    if (taskUtils.getAggregateCpus(profile) <= cpus
++        && taskUtils.getAggregateMemory(profile) <= mem
++        && SchedulerUtils.isMatchSlaveAttributes(offer, requestAttributes)
++        && NMPorts.expectedNumPorts() <= ports) {
++      return true;
++    } else {
++      LOGGER.info("Offer not sufficient for task with, cpu: {}, memory: {}, 
ports: {}",
++          taskUtils.getAggregateCpus(profile), 
taskUtils.getAggregateMemory(profile), ports);
++      return false;
+     }
++  }
++
++  private static Double scalarToDouble(Resource resource, String id) {
++    Double value = new Double(0.0);
++    if (resource.getType().equals(Value.Type.SCALAR)) {
++      value = new Double(resource.getScalar().getValue());
++    } else {
++      LOGGER.error(id + " resource was not a scalar: {}", resource
++          .getType().toString());
++    }
++    return value;
++  }
++
++  private interface EvalResources {
++    public void eval(Resource resource, Map<String, Object>results);
++  }
++
++  private static Map<String, EvalResources> resourceEvaluators;
++
++  static {
++    resourceEvaluators = new HashMap<String, EvalResources>(4);
++    resourceEvaluators.put("cpus", new EvalResources() {
++      public void eval(Resource resource, Map<String, Object> results) {
++        results.put("cpus", scalarToDouble(resource, "cpus"));
++      }
++    });
++    resourceEvaluators.put("mem", new EvalResources() {
++      public void eval(Resource resource, Map<String, Object> results) {
++        results.put("mem", scalarToDouble(resource, "mem"));
++      }
++    });
++    resourceEvaluators.put("disk", new EvalResources() {
++      public void eval(Resource resource, Map<String, Object> results) {
++      }
++    });
++    resourceEvaluators.put("ports", new EvalResources() {
++      public void eval(Resource resource, Map<String, Object> results) {
++        int ports = 0;
++        if (resource.getType().equals(Value.Type.RANGES)) {
++          Value.Ranges ranges = resource.getRanges();
++          for (Value.Range range : ranges.getRangeList()) {
++            if (range.getBegin() < range.getEnd()) {
++              ports += range.getEnd() - range.getBegin() + 1;
++            }
++          }
  
-         if (taskUtils.getAggregateCpus(profile) <= cpus
-                 && taskUtils.getAggregateMemory(profile) <= mem
-                 && SchedulerUtils.isMatchSlaveAttributes(offer,
-                 requestAttributes)) {
-             return true;
 -    private static Map<String, EvalResources> resourceEvaluators;
 +        } else {
-             LOGGER.info("Offer not sufficient for task with, cpu: {}, memory: 
{}", taskUtils.getAggregateCpus(profile), 
taskUtils.getAggregateMemory(profile));
-             return false;
-         }
-     }
++          LOGGER.error("ports resource was not Ranges: {}", resource
++              .getType().toString());
  
 -    static {
 -        resourceEvaluators = new HashMap<String, EvalResources>(4);
 -        resourceEvaluators.put("cpus", new EvalResources() {
 -            public void eval(Resource resource, Map<String, Object> results) {
 -                results.put("cpus", scalarToDouble(resource, "cpus"));
 -            }
 -        });
 -        resourceEvaluators.put("mem", new EvalResources() {
 -            public void eval(Resource resource, Map<String, Object> results) {
 -                results.put("mem", scalarToDouble(resource, "mem"));
 -            }
 -        });
 -        resourceEvaluators.put("disk", new EvalResources() {
 -            public void eval(Resource resource, Map<String, Object> results) {
 -            }
 -        });
 -        resourceEvaluators.put("ports", new EvalResources() {
 -            public void eval(Resource resource, Map<String, Object> results) {
 -                int ports = 0;
 -                if (resource.getType().equals(Value.Type.RANGES)) {
 -                    Value.Ranges ranges = resource.getRanges();
 -                    for (Value.Range range : ranges.getRangeList()) {
 -                        if (range.getBegin() < range.getEnd()) {
 -                            ports += range.getEnd() - range.getBegin() + 1;
 -                        }
 -                    }
 -
 -                } else {
 -                    LOGGER.error("ports resource was not Ranges: {}", resource
 -                            .getType().toString());
 -
 -                }
 -                results.put("ports", Integer.valueOf(ports));
 -            }
 -        });
 -    }
++        }
++        results.put("ports", Integer.valueOf(ports));
++      }
++    });
++  }
  }

Reply via email to