http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/101bcad3/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/DownloadNMExecutorCLGenImpl.java
----------------------------------------------------------------------
diff --git 
a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/DownloadNMExecutorCLGenImpl.java
 
b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/DownloadNMExecutorCLGenImpl.java
new file mode 100644
index 0000000..23e9798
--- /dev/null
+++ 
b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/DownloadNMExecutorCLGenImpl.java
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.myriad.scheduler;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.myriad.configuration.MyriadConfiguration;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
+
+/**
+ * Implementation assumes NM binaries will be downloaded
+ */
+public class DownloadNMExecutorCLGenImpl extends NMExecutorCLGenImpl {
+
+  private static final Logger LOGGER = LoggerFactory.
+      getLogger(DownloadNMExecutorCLGenImpl.class);
+
+  private final String nodeManagerUri;
+
+  public DownloadNMExecutorCLGenImpl(MyriadConfiguration cfg, String 
nodeManagerUri) {
+    super(cfg);
+    this.nodeManagerUri = nodeManagerUri;
+  }
+
+  @Override
+  public String generateCommandLine(ServiceResourceProfile profile, Ports 
ports) {
+    StringBuilder cmdLine = new StringBuilder();
+    LOGGER.info("Using remote distribution");
+
+    generateEnvironment(profile, (NMPorts) ports);
+    appendDistroExtractionCommands(cmdLine);
+    appendCgroupsCmds(cmdLine);
+    appendYarnHomeExport(cmdLine);
+    appendUser(cmdLine);
+    appendEnvForNM(cmdLine);
+    cmdLine.append(YARN_NM_CMD);
+    return cmdLine.toString();
+  }
+
+  protected void appendDistroExtractionCommands(StringBuilder cmdLine) {
+    /*
+    TODO(darinj): Overall this is messier than I'd like. We can't let mesos 
untar the distribution, since
+    it will change the permissions.  Instead we simply download the tarball 
and execute tar -xvpf. We also
+    pull the config from the resource manager and put them in the conf dir.  
This is also why we need
+    frameworkSuperUser. This will be refactored after Mesos-1790 is resolved.
+   */
+
+    //TODO(DarinJ) support other compression, as this is a temp fix for Mesos 
1760 may not get to it.
+    //Extract tarball keeping permissions, necessary to keep 
HADOOP_HOME/bin/container-executor suidbit set.
+    cmdLine.append("sudo tar -zxpf ").append(getFileName(nodeManagerUri));
+
+    //We need the current directory to be writable by frameworkUser for 
capsuleExecutor to create directories.
+    //Best to simply give owenership to the user running the executor but we 
don't want to use -R as this
+    //will silently remove the suid bit on container executor.
+    cmdLine.append(" && sudo chown 
").append(cfg.getFrameworkUser().get()).append(" .");
+
+    //Place the hadoop config where in the HADOOP_CONF_DIR where it will be 
read by the NodeManager
+    //The url for the resource manager config is: http(s)://hostname:port/conf 
so fetcher.cpp downloads the
+    //config file to conf, It's an xml file with the parameters of 
yarn-site.xml, core-site.xml and hdfs.xml.
+    cmdLine.append(" && cp conf 
").append(cfg.getYarnEnvironment().get("YARN_HOME")).append("/etc/hadoop/yarn-site.xml;");
+  }
+
+  protected void appendUser(StringBuilder cmdLine) {
+    cmdLine.append(" sudo -E -u 
").append(cfg.getFrameworkUser().get()).append(" -H");
+  }
+
+  private static String getFileName(String uri) {
+    int lastSlash = uri.lastIndexOf('/');
+    if (lastSlash == -1) {
+      return uri;
+    } else {
+      String fileName = uri.substring(lastSlash + 1);
+      Preconditions.checkArgument(!Strings.isNullOrEmpty(fileName), "URI 
should not have a slash at the end");
+      return fileName;
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/101bcad3/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/ExecutorCommandLineGenerator.java
----------------------------------------------------------------------
diff --git 
a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/ExecutorCommandLineGenerator.java
 
b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/ExecutorCommandLineGenerator.java
new file mode 100644
index 0000000..82782f2
--- /dev/null
+++ 
b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/ExecutorCommandLineGenerator.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.myriad.scheduler;
+
+/**
+ * Interface to plugin multiple implementations for executor command generation
+ */
+public interface ExecutorCommandLineGenerator {
+  String generateCommandLine(ServiceResourceProfile profile, Ports ports);
+
+  String getConfigurationUrl();
+}

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/101bcad3/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/ExtendedResourceProfile.java
----------------------------------------------------------------------
diff --git 
a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/ExtendedResourceProfile.java
 
b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/ExtendedResourceProfile.java
new file mode 100644
index 0000000..8119360
--- /dev/null
+++ 
b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/ExtendedResourceProfile.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.myriad.scheduler;
+
+import com.google.gson.Gson;
+
+/**
+ * Extended ServiceResourceProfile for services that need to pass set of 
resources downstream
+ * currently the only such service is NodeManager
+ */
+public class ExtendedResourceProfile extends ServiceResourceProfile {
+
+  private NMProfile childProfile;
+
+  /**
+   * @param childProfile - should be null
+   * @param cpu
+   * @param mem          will throw NullPoiterException if childProfile is null
+   */
+  public ExtendedResourceProfile(NMProfile childProfile, Double cpu, Double 
mem) {
+    super(childProfile.getName(), cpu, mem);
+    this.childProfile = childProfile;
+    this.className = ExtendedResourceProfile.class.getName();
+  }
+
+  public NMProfile getChildProfile() {
+    return childProfile;
+  }
+
+  public void setChildProfile(NMProfile nmProfile) {
+    this.childProfile = nmProfile;
+  }
+
+  @Override
+  public String getName() {
+    return childProfile.getName();
+  }
+
+  @Override
+  public Double getCpus() {
+    return childProfile.getCpus().doubleValue();
+  }
+
+  @Override
+  public Double getMemory() {
+    return childProfile.getMemory().doubleValue();
+  }
+
+  @Override
+  public Double getAggregateMemory() {
+    return memory + childProfile.getMemory();
+  }
+
+  @Override
+  public Double getAggregateCpu() {
+    return cpus + childProfile.getCpus();
+  }
+
+  @Override
+  public String toString() {
+    Gson gson = new Gson();
+    return gson.toJson(this);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/101bcad3/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/MyriadDriver.java
----------------------------------------------------------------------
diff --git 
a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/MyriadDriver.java 
b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/MyriadDriver.java
new file mode 100644
index 0000000..741c6de
--- /dev/null
+++ 
b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/MyriadDriver.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.myriad.scheduler;
+
+import javax.inject.Inject;
+
+import org.apache.mesos.Protos.Status;
+import org.apache.mesos.Protos.TaskID;
+import org.apache.mesos.SchedulerDriver;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Driver for Myriad scheduler.
+ */
+public class MyriadDriver {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(MyriadDriver.class);
+
+  private final SchedulerDriver driver;
+
+  @Inject
+  public MyriadDriver(SchedulerDriver driver) {
+    this.driver = driver;
+  }
+
+  public Status start() {
+    LOGGER.info("Starting driver");
+    Status status = driver.start();
+    LOGGER.info("Driver started with status: {}", status);
+    return status;
+  }
+
+  public Status kill(final TaskID taskId) {
+    LOGGER.info("Killing task {}", taskId);
+    Status status = driver.killTask(taskId);
+    LOGGER.info("Task {} killed with status: {}", taskId, status);
+    return status;
+  }
+
+  public Status abort() {
+    LOGGER.info("Aborting driver");
+    Status status = driver.abort();
+    LOGGER.info("Driver aborted with status: {}", status);
+    return status;
+  }
+
+  public SchedulerDriver getDriver() {
+    return driver;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/101bcad3/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/MyriadDriverManager.java
----------------------------------------------------------------------
diff --git 
a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/MyriadDriverManager.java
 
b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/MyriadDriverManager.java
new file mode 100644
index 0000000..44d7e06
--- /dev/null
+++ 
b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/MyriadDriverManager.java
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.myriad.scheduler;
+
+import com.google.common.base.Preconditions;
+import org.apache.mesos.Protos;
+import org.apache.mesos.Protos.Status;
+import org.apache.mesos.Protos.TaskID;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.inject.Inject;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * Manager for the myriad scheduler driver
+ */
+public class MyriadDriverManager {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(MyriadDriverManager.class);
+  private final Lock driverLock;
+  private MyriadDriver driver;
+  private Status driverStatus;
+
+  @Inject
+  public MyriadDriverManager(MyriadDriver driver) {
+    this.driver = driver;
+    this.driverLock = new ReentrantLock();
+    this.driverStatus = Protos.Status.DRIVER_NOT_STARTED;
+  }
+
+  public Status startDriver() {
+    this.driverLock.lock();
+    try {
+      Preconditions.checkState(this.isStartable());
+      LOGGER.info("Starting driver...");
+      this.driverStatus = driver.start();
+      LOGGER.info("Driver started with status: {}", this.driverStatus);
+    } finally {
+      this.driverLock.unlock();
+    }
+    return this.driverStatus;
+  }
+
+  public Status stopDriver() {
+    this.driverLock.lock();
+    try {
+      if (isRunning()) {
+        LOGGER.info("Aborting driver...");
+        this.driverStatus = this.driver.abort();
+        LOGGER.info("Aborted driver with status: {}", this.driverStatus);
+      }
+    } finally {
+      this.driverLock.unlock();
+    }
+    return driverStatus;
+  }
+
+  public Status kill(final TaskID taskId) {
+    LOGGER.info("Killing task {}", taskId);
+    this.driverLock.lock();
+    try {
+      if (isRunning()) {
+        this.driverStatus = driver.kill(taskId);
+        LOGGER.info("Task {} killed with status: {}", taskId, 
this.driverStatus);
+      } else {
+        LOGGER.warn("Cannot kill task, driver is not running");
+      }
+    } finally {
+      this.driverLock.unlock();
+    }
+
+    return driverStatus;
+  }
+
+  public Status getDriverStatus() {
+    return this.driverStatus;
+  }
+
+  private boolean isStartable() {
+    return this.driver != null && this.driverStatus == 
Status.DRIVER_NOT_STARTED;
+  }
+
+  private boolean isRunning() {
+    return this.driver != null && this.driverStatus == Status.DRIVER_RUNNING;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/101bcad3/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/MyriadOperations.java
----------------------------------------------------------------------
diff --git 
a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/MyriadOperations.java
 
b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/MyriadOperations.java
new file mode 100644
index 0000000..6b0de6d
--- /dev/null
+++ 
b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/MyriadOperations.java
@@ -0,0 +1,236 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.myriad.scheduler;
+
+import org.apache.myriad.configuration.MyriadBadConfigurationException;
+import org.apache.myriad.configuration.ServiceConfiguration;
+import org.apache.myriad.configuration.MyriadConfiguration;
+import org.apache.myriad.configuration.NodeManagerConfiguration;
+import org.apache.myriad.policy.NodeScaleDownPolicy;
+import org.apache.myriad.scheduler.constraints.Constraint;
+import org.apache.myriad.scheduler.constraints.LikeConstraint;
+import org.apache.myriad.state.NodeTask;
+import org.apache.myriad.state.SchedulerState;
+import com.google.common.collect.Lists;
+import com.google.inject.Inject;
+
+import org.apache.mesos.Protos;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * Myriad scheduler operations
+ */
+public class MyriadOperations {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(MyriadOperations.class);
+  private final SchedulerState schedulerState;
+
+  private MyriadConfiguration cfg;
+  private NodeScaleDownPolicy nodeScaleDownPolicy;
+
+  @Inject
+  public MyriadOperations(MyriadConfiguration cfg, SchedulerState 
schedulerState, NodeScaleDownPolicy nodeScaleDownPolicy) {
+    this.cfg = cfg;
+    this.schedulerState = schedulerState;
+    this.nodeScaleDownPolicy = nodeScaleDownPolicy;
+  }
+
+  public void flexUpCluster(ServiceResourceProfile serviceResourceProfile, int 
instances, Constraint constraint) {
+    Collection<NodeTask> nodes = new HashSet<>();
+    for (int i = 0; i < instances; i++) {
+      NodeTask nodeTask = new NodeTask(serviceResourceProfile, constraint);
+      nodeTask.setTaskPrefix(NodeManagerConfiguration.NM_TASK_PREFIX);
+      nodes.add(nodeTask);
+    }
+
+    LOGGER.info("Adding {} NM instances to cluster", nodes.size());
+    this.schedulerState.addNodes(nodes);
+  }
+
+  public void flexDownCluster(ServiceResourceProfile serviceResourceProfile, 
Constraint constraint, int numInstancesToScaleDown) {
+    // Flex down Pending tasks, if any
+    int numPendingTasksScaledDown = 
flexDownPendingTasks(serviceResourceProfile, constraint, 
numInstancesToScaleDown);
+
+    // Flex down Staging tasks, if any
+    int numStagingTasksScaledDown = 
flexDownStagingTasks(serviceResourceProfile, constraint, 
numInstancesToScaleDown - numPendingTasksScaledDown);
+
+    // Flex down Active tasks, if any
+    int numActiveTasksScaledDown = flexDownActiveTasks(serviceResourceProfile, 
constraint, numInstancesToScaleDown - numPendingTasksScaledDown - 
numStagingTasksScaledDown);
+
+    if (numActiveTasksScaledDown + numStagingTasksScaledDown + 
numPendingTasksScaledDown == 0) {
+      LOGGER.info("No Node Managers with profile '{}' and constraint '{}' 
found for scaling down.", serviceResourceProfile.getName(), constraint == null 
? "null" : constraint.toString());
+    } else {
+      LOGGER.info("Flexed down {} active, {} staging  and {} pending Node 
Managers with " + "'{}' profile and constraint '{}'.", 
numActiveTasksScaledDown, numStagingTasksScaledDown, numPendingTasksScaledDown, 
serviceResourceProfile.getName(),
+          constraint == null ? "null" : constraint.toString());
+    }
+  }
+
+  /**
+   * Flexup a service
+   *
+   * @param instances
+   * @param serviceName
+   */
+  public void flexUpAService(int instances, String serviceName) throws 
MyriadBadConfigurationException {
+    final ServiceConfiguration auxTaskConf = 
cfg.getServiceConfiguration(serviceName);
+
+    int totalflexInstances = instances + getFlexibleInstances(serviceName);
+    Integer maxInstances = auxTaskConf.getMaxInstances().orNull();
+    if (maxInstances != null && maxInstances > 0) {
+      // check number of instances
+      // sum of active, staging, pending should be < maxInstances
+      if (totalflexInstances > maxInstances) {
+        LOGGER.error("Current number of active, staging, pending and requested 
instances: {}" + ", while it is greater then max instances allowed: {}", 
totalflexInstances, maxInstances);
+        throw new MyriadBadConfigurationException("Current number of active, 
staging, pending instances and requested: " + totalflexInstances + ", while it 
is greater then max instances allowed: " + maxInstances);
+      }
+    }
+
+    final Double cpu = 
auxTaskConf.getCpus().or(ServiceConfiguration.DEFAULT_CPU);
+    final Double mem = 
auxTaskConf.getJvmMaxMemoryMB().or(ServiceConfiguration.DEFAULT_MEMORY);
+
+    Collection<NodeTask> nodes = new HashSet<>();
+    for (int i = 0; i < instances; i++) {
+      NodeTask nodeTask = new NodeTask(new ServiceResourceProfile(serviceName, 
cpu, mem), null);
+      nodeTask.setTaskPrefix(serviceName);
+      nodes.add(nodeTask);
+    }
+
+    LOGGER.info("Adding {} {} instances to cluster", nodes.size(), 
serviceName);
+    this.schedulerState.addNodes(nodes);
+  }
+
+  /**
+   * Flexing down any service defined in the configuration
+   *
+   * @param numInstancesToScaleDown
+   * @param serviceName             - name of the service
+   */
+  public void flexDownAService(int numInstancesToScaleDown, String 
serviceName) {
+    LOGGER.info("About to flex down {} instances of {}", 
numInstancesToScaleDown, serviceName);
+
+    int numScaledDown = 0;
+
+    // Flex down Pending tasks, if any
+    if (numScaledDown < numInstancesToScaleDown) {
+      Collection<Protos.TaskID> pendingTasks = 
this.schedulerState.getPendingTaskIds(serviceName);
+
+      for (Protos.TaskID taskId : pendingTasks) {
+        this.schedulerState.makeTaskKillable(taskId);
+        numScaledDown++;
+        if (numScaledDown >= numInstancesToScaleDown) {
+          break;
+        }
+      }
+    }
+    int numPendingTasksScaledDown = numScaledDown;
+
+    // Flex down Staging tasks, if any
+    if (numScaledDown < numInstancesToScaleDown) {
+      Collection<Protos.TaskID> stagingTasks = 
this.schedulerState.getStagingTaskIds(serviceName);
+
+      for (Protos.TaskID taskId : stagingTasks) {
+        this.schedulerState.makeTaskKillable(taskId);
+        numScaledDown++;
+        if (numScaledDown >= numInstancesToScaleDown) {
+          break;
+        }
+      }
+    }
+    int numStagingTasksScaledDown = numScaledDown - numPendingTasksScaledDown;
+
+    Set<NodeTask> activeTasks = 
this.schedulerState.getActiveTasksByType(serviceName);
+    if (numScaledDown < numInstancesToScaleDown) {
+      for (NodeTask nodeTask : activeTasks) {
+        
this.schedulerState.makeTaskKillable(nodeTask.getTaskStatus().getTaskId());
+        numScaledDown++;
+        if (LOGGER.isDebugEnabled()) {
+          LOGGER.debug("Marked NodeTask {} on host {} for kill.", 
nodeTask.getTaskStatus().getTaskId(), nodeTask.getHostname());
+        }
+        if (numScaledDown >= numInstancesToScaleDown) {
+          break;
+        }
+      }
+    }
+
+    LOGGER.info("Flexed down {} of {} instances including {} staging 
instances, and {} pending instances of {}", numScaledDown, 
numInstancesToScaleDown, numStagingTasksScaledDown, numPendingTasksScaledDown, 
serviceName);
+  }
+
+  private int flexDownPendingTasks(ServiceResourceProfile profile, Constraint 
constraint, int numInstancesToScaleDown) {
+    return numInstancesToScaleDown > 0 ? 
flexDownTasks(schedulerState.getPendingTaskIDsForProfile(profile), profile, 
constraint, numInstancesToScaleDown) : 0;
+  }
+
+  private int flexDownStagingTasks(ServiceResourceProfile profile, Constraint 
constraint, int numInstancesToScaleDown) {
+    return numInstancesToScaleDown > 0 ? 
flexDownTasks(schedulerState.getStagingTaskIDsForProfile(profile), profile, 
constraint, numInstancesToScaleDown) : 0;
+  }
+
+  private int flexDownActiveTasks(ServiceResourceProfile profile, Constraint 
constraint, int numInstancesToScaleDown) {
+    if (numInstancesToScaleDown > 0) {
+      List<Protos.TaskID> activeTasksForProfile = 
Lists.newArrayList(schedulerState.getActiveTaskIDsForProfile(profile));
+      nodeScaleDownPolicy.apply(activeTasksForProfile);
+      return flexDownTasks(activeTasksForProfile, profile, constraint, 
numInstancesToScaleDown);
+    }
+    return 0;
+  }
+
+  private int flexDownTasks(Collection<Protos.TaskID> taskIDs, 
ServiceResourceProfile profile, Constraint constraint, int 
numInstancesToScaleDown) {
+    int numInstancesScaledDown = 0;
+    for (Protos.TaskID taskID : taskIDs) {
+      NodeTask nodeTask = schedulerState.getTask(taskID);
+      if (nodeTask.getProfile().getName().equals(profile.getName()) && 
meetsConstraint(nodeTask, constraint)) {
+        this.schedulerState.makeTaskKillable(taskID);
+        numInstancesScaledDown++;
+        if (numInstancesScaledDown == numInstancesToScaleDown) {
+          break;
+        }
+      }
+    }
+    return numInstancesScaledDown;
+  }
+
+  private boolean meetsConstraint(NodeTask nodeTask, Constraint constraint) {
+    if (constraint != null) {
+      if (constraint.equals(nodeTask.getConstraint())) {
+        return true;
+      }
+      switch (constraint.getType()) {
+        case LIKE:
+          LikeConstraint likeConstraint = (LikeConstraint) constraint;
+          if (likeConstraint.isConstraintOnHostName()) {
+            return likeConstraint.matchesHostName(nodeTask.getHostname());
+          } else {
+            return 
likeConstraint.matchesSlaveAttributes(nodeTask.getSlaveAttributes());
+          }
+
+        default:
+          return false;
+      }
+    }
+    return true;
+  }
+
+  public Integer getFlexibleInstances(String taskPrefix) {
+    return this.schedulerState.getActiveTaskIds(taskPrefix).size() + 
this.schedulerState.getStagingTaskIds(taskPrefix).size() + 
this.schedulerState.getPendingTaskIds(taskPrefix).size();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/101bcad3/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/MyriadScheduler.java
----------------------------------------------------------------------
diff --git 
a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/MyriadScheduler.java
 
b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/MyriadScheduler.java
new file mode 100644
index 0000000..235d01b
--- /dev/null
+++ 
b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/MyriadScheduler.java
@@ -0,0 +1,161 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.myriad.scheduler;
+
+import org.apache.myriad.configuration.MyriadConfiguration;
+import org.apache.myriad.scheduler.event.ErrorEvent;
+import org.apache.myriad.scheduler.event.FrameworkMessageEvent;
+import org.apache.myriad.scheduler.event.OfferRescindedEvent;
+import org.apache.myriad.scheduler.event.ReRegisteredEvent;
+import org.apache.myriad.scheduler.event.ResourceOffersEvent;
+import org.apache.myriad.scheduler.event.SlaveLostEvent;
+import org.apache.myriad.scheduler.event.StatusUpdateEvent;
+import com.lmax.disruptor.EventTranslator;
+import org.apache.mesos.Protos;
+import org.apache.mesos.Scheduler;
+import org.apache.mesos.SchedulerDriver;
+
+import javax.inject.Inject;
+import java.util.List;
+
+/**
+ * Myriad Scheduler
+ */
+public class MyriadScheduler implements Scheduler {
+  private org.apache.myriad.DisruptorManager disruptorManager;
+
+  @Inject
+  public MyriadScheduler(final MyriadConfiguration cfg, final 
org.apache.myriad.DisruptorManager disruptorManager) {
+    this.disruptorManager = disruptorManager;
+  }
+
+  @Override
+  public void registered(final SchedulerDriver driver, final 
Protos.FrameworkID frameworkId, final Protos.MasterInfo masterInfo) {
+    disruptorManager.getRegisteredEventDisruptor().publishEvent(new 
EventTranslator<org.apache.myriad.scheduler.event.RegisteredEvent>() {
+      @Override
+      public void 
translateTo(org.apache.myriad.scheduler.event.RegisteredEvent event, long 
sequence) {
+        event.setDriver(driver);
+        event.setFrameworkId(frameworkId);
+        event.setMasterInfo(masterInfo);
+      }
+    });
+  }
+
+  @Override
+  public void reregistered(final SchedulerDriver driver, final 
Protos.MasterInfo masterInfo) {
+    disruptorManager.getReRegisteredEventDisruptor().publishEvent(new 
EventTranslator<ReRegisteredEvent>() {
+      @Override
+      public void translateTo(ReRegisteredEvent event, long sequence) {
+        event.setDriver(driver);
+        event.setMasterInfo(masterInfo);
+      }
+    });
+  }
+
+  @Override
+  public void resourceOffers(final SchedulerDriver driver, final 
List<Protos.Offer> offers) {
+    disruptorManager.getResourceOffersEventDisruptor().publishEvent(new 
EventTranslator<ResourceOffersEvent>() {
+      @Override
+      public void translateTo(ResourceOffersEvent event, long sequence) {
+        event.setDriver(driver);
+        event.setOffers(offers);
+      }
+    });
+  }
+
+  @Override
+  public void offerRescinded(final SchedulerDriver driver, final 
Protos.OfferID offerId) {
+    disruptorManager.getOfferRescindedEventDisruptor().publishEvent(new 
EventTranslator<OfferRescindedEvent>() {
+      @Override
+      public void translateTo(OfferRescindedEvent event, long sequence) {
+        event.setDriver(driver);
+        event.setOfferId(offerId);
+      }
+    });
+  }
+
+  @Override
+  public void statusUpdate(final SchedulerDriver driver, final 
Protos.TaskStatus status) {
+    disruptorManager.getStatusUpdateEventDisruptor().publishEvent(new 
EventTranslator<StatusUpdateEvent>() {
+      @Override
+      public void translateTo(StatusUpdateEvent event, long sequence) {
+        event.setDriver(driver);
+        event.setStatus(status);
+      }
+    });
+  }
+
+  @Override
+  public void frameworkMessage(final SchedulerDriver driver, final 
Protos.ExecutorID executorId, final Protos.SlaveID slaveId, final byte[] bytes) 
{
+    disruptorManager.getFrameworkMessageEventDisruptor().publishEvent(new 
EventTranslator<FrameworkMessageEvent>() {
+      @Override
+      public void translateTo(FrameworkMessageEvent event, long sequence) {
+        event.setDriver(driver);
+        event.setBytes(bytes);
+        event.setExecutorId(executorId);
+        event.setSlaveId(slaveId);
+      }
+    });
+  }
+
+  @Override
+  public void disconnected(final SchedulerDriver driver) {
+    disruptorManager.getDisconnectedEventDisruptor().publishEvent(new 
EventTranslator<org.apache.myriad.scheduler.event.DisconnectedEvent>() {
+      @Override
+      public void 
translateTo(org.apache.myriad.scheduler.event.DisconnectedEvent event, long 
sequence) {
+        event.setDriver(driver);
+      }
+    });
+  }
+
+  @Override
+  public void slaveLost(final SchedulerDriver driver, final Protos.SlaveID 
slaveId) {
+    disruptorManager.getSlaveLostEventDisruptor().publishEvent(new 
EventTranslator<SlaveLostEvent>() {
+      @Override
+      public void translateTo(SlaveLostEvent event, long sequence) {
+        event.setDriver(driver);
+        event.setSlaveId(slaveId);
+      }
+    });
+  }
+
+  @Override
+  public void executorLost(final SchedulerDriver driver, final 
Protos.ExecutorID executorId, final Protos.SlaveID slaveId, final int 
exitStatus) {
+    disruptorManager.getExecutorLostEventDisruptor().publishEvent(new 
EventTranslator<org.apache.myriad.scheduler.event.ExecutorLostEvent>() {
+      @Override
+      public void 
translateTo(org.apache.myriad.scheduler.event.ExecutorLostEvent event, long 
sequence) {
+        event.setDriver(driver);
+        event.setExecutorId(executorId);
+        event.setSlaveId(slaveId);
+        event.setExitStatus(exitStatus);
+      }
+    });
+  }
+
+  @Override
+  public void error(final SchedulerDriver driver, final String message) {
+    disruptorManager.getErrorEventDisruptor().publishEvent(new 
EventTranslator<ErrorEvent>() {
+      @Override
+      public void translateTo(ErrorEvent event, long sequence) {
+        event.setDriver(driver);
+        event.setMessage(message);
+      }
+    });
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/101bcad3/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/NMExecutorCLGenImpl.java
----------------------------------------------------------------------
diff --git 
a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/NMExecutorCLGenImpl.java
 
b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/NMExecutorCLGenImpl.java
new file mode 100644
index 0000000..411518f
--- /dev/null
+++ 
b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/NMExecutorCLGenImpl.java
@@ -0,0 +1,180 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.myriad.scheduler;
+
+import java.util.Map;
+import java.util.HashMap;
+
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.myriad.configuration.MyriadConfiguration;
+
+/**
+ * Implementation assumes NM binaries already deployed
+ */
+public class NMExecutorCLGenImpl implements ExecutorCommandLineGenerator {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(NMExecutorCLGenImpl.class);
+
+  public static final String ENV_YARN_NODEMANAGER_OPTS = 
"YARN_NODEMANAGER_OPTS";
+  public static final String KEY_YARN_NM_CGROUPS_PATH = 
"yarn.nodemanager.cgroups.path";
+  public static final String KEY_YARN_RM_HOSTNAME = 
"yarn.resourcemanager.hostname";
+
+  /**
+   * YARN container executor class.
+   */
+  public static final String KEY_YARN_NM_CONTAINER_EXECUTOR_CLASS = 
"yarn.nodemanager.container-executor.class";
+  // TODO (mohit): Should it be configurable ?
+  public static final String VAL_YARN_NM_CONTAINER_EXECUTOR_CLASS = 
"org.apache.hadoop.yarn.server.nodemanager.LinuxContainerExecutor";
+  public static final String DEFAULT_YARN_NM_CONTAINER_EXECUTOR_CLASS = 
"org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor";
+
+  /**
+   * YARN class to help handle LCE resources
+   */
+  public static final String KEY_YARN_NM_LCE_RH_CLASS = 
"yarn.nodemanager.linux-container-executor.resources-handler.class";
+
+  // TODO (mohit): Should it be configurable ?
+  public static final String VAL_YARN_NM_LCE_RH_CLASS = 
"org.apache.hadoop.yarn.server.nodemanager.util.CgroupsLCEResourcesHandler";
+  public static final String KEY_YARN_NM_LCE_CGROUPS_HIERARCHY = 
"yarn.nodemanager.linux-container-executor.cgroups.hierarchy";
+  public static final String VAL_YARN_NM_LCE_CGROUPS_HIERARCHY = 
"mesos/$TASK_DIR";
+  public static final String KEY_YARN_NM_LCE_CGROUPS_MOUNT = 
"yarn.nodemanager.linux-container-executor.cgroups.mount";
+  public static final String KEY_YARN_NM_LCE_CGROUPS_MOUNT_PATH = 
"yarn.nodemanager.linux-container-executor.cgroups.mount-path";
+  public static final String VAL_YARN_NM_LCE_CGROUPS_MOUNT_PATH = 
"/sys/fs/cgroup";
+  public static final String KEY_YARN_NM_LCE_GROUP = 
"yarn.nodemanager.linux-container-executor.group";
+  public static final String KEY_YARN_NM_LCE_PATH = 
"yarn.nodemanager.linux-container-executor.path";
+  public static final String KEY_YARN_HOME = "yarn.home";
+  public static final String KEY_NM_RESOURCE_CPU_VCORES = 
"nodemanager.resource.cpu-vcores";
+  public static final String KEY_NM_RESOURCE_MEM_MB = 
"nodemanager.resource.memory-mb";
+  public static final String YARN_NM_CMD = " $YARN_HOME/bin/yarn nodemanager";
+  public static final String KEY_NM_ADDRESS = 
"myriad.yarn.nodemanager.address";
+  public static final String KEY_NM_LOCALIZER_ADDRESS = 
"myriad.yarn.nodemanager.localizer.address";
+  public static final String KEY_NM_WEBAPP_ADDRESS = 
"myriad.yarn.nodemanager.webapp.address";
+  public static final String KEY_NM_SHUFFLE_PORT = 
"myriad.mapreduce.shuffle.port";
+
+  private static final String ALL_LOCAL_IPV4ADDR = "0.0.0.0:";
+  private static final String PROPERTY_FORMAT = "-D%s=%s";
+
+  private Map<String, String> environment = new HashMap<>();
+  protected MyriadConfiguration cfg;
+
+  public NMExecutorCLGenImpl(MyriadConfiguration cfg) {
+    this.cfg = cfg;
+  }
+
+  @Override
+  public String generateCommandLine(ServiceResourceProfile profile, Ports 
ports) {
+    StringBuilder cmdLine = new StringBuilder();
+
+    generateEnvironment(profile, (NMPorts) ports);
+    appendCgroupsCmds(cmdLine);
+    appendYarnHomeExport(cmdLine);
+    appendEnvForNM(cmdLine);
+    cmdLine.append(YARN_NM_CMD);
+    return cmdLine.toString();
+  }
+
+  protected void generateEnvironment(ServiceResourceProfile profile, NMPorts 
ports) {
+    //yarnEnvironemnt configuration from yaml file
+    Map<String, String> yarnEnvironmentMap = cfg.getYarnEnvironment();
+    if (yarnEnvironmentMap != null) {
+      environment.putAll(yarnEnvironmentMap);
+    }
+
+    String rmHostName = System.getProperty(KEY_YARN_RM_HOSTNAME);
+    if (rmHostName != null && !rmHostName.isEmpty()) {
+      addYarnNodemanagerOpt(KEY_YARN_RM_HOSTNAME, rmHostName);
+    }
+
+    if (cfg.getNodeManagerConfiguration().getCgroups().or(Boolean.FALSE)) {
+      addYarnNodemanagerOpt(KEY_YARN_NM_CONTAINER_EXECUTOR_CLASS, 
VAL_YARN_NM_CONTAINER_EXECUTOR_CLASS);
+      addYarnNodemanagerOpt(KEY_YARN_NM_LCE_RH_CLASS, 
VAL_YARN_NM_LCE_RH_CLASS);
+
+      // TODO: Configure hierarchy
+      addYarnNodemanagerOpt(KEY_YARN_NM_LCE_CGROUPS_HIERARCHY, 
VAL_YARN_NM_LCE_CGROUPS_HIERARCHY);
+      addYarnNodemanagerOpt(KEY_YARN_NM_LCE_CGROUPS_MOUNT, "true");
+      // TODO: Make it configurable
+      addYarnNodemanagerOpt(KEY_YARN_NM_LCE_CGROUPS_MOUNT_PATH, 
VAL_YARN_NM_LCE_CGROUPS_MOUNT_PATH);
+      addYarnNodemanagerOpt(KEY_YARN_NM_LCE_GROUP, "root");
+      if (environment.containsKey("YARN_HOME")) {
+        addYarnNodemanagerOpt(KEY_YARN_HOME, environment.get("YARN_HOME"));
+      }
+    } else {
+      // Otherwise configure to use Default
+      addYarnNodemanagerOpt(KEY_YARN_NM_CONTAINER_EXECUTOR_CLASS, 
DEFAULT_YARN_NM_CONTAINER_EXECUTOR_CLASS);
+    }
+    addYarnNodemanagerOpt(KEY_NM_RESOURCE_CPU_VCORES, 
Integer.toString(profile.getCpus().intValue()));
+    addYarnNodemanagerOpt(KEY_NM_RESOURCE_MEM_MB, 
Integer.toString(profile.getMemory().intValue()));
+    addYarnNodemanagerOpt(KEY_NM_ADDRESS, ALL_LOCAL_IPV4ADDR + 
Long.valueOf(ports.getRpcPort()).toString());
+    addYarnNodemanagerOpt(KEY_NM_LOCALIZER_ADDRESS, ALL_LOCAL_IPV4ADDR + 
Long.valueOf(ports.getLocalizerPort()).toString());
+    addYarnNodemanagerOpt(KEY_NM_WEBAPP_ADDRESS, ALL_LOCAL_IPV4ADDR + 
Long.valueOf(ports.getWebAppHttpPort()).toString());
+    addYarnNodemanagerOpt(KEY_NM_SHUFFLE_PORT, 
Long.valueOf(ports.getShufflePort()).toString());
+  }
+
+  protected void appendEnvForNM(StringBuilder cmdLine) {
+    cmdLine.append(" env ");
+    for (Map.Entry<String, String> env : environment.entrySet()) {
+      
cmdLine.append(env.getKey()).append("=").append("\"").append(env.getValue()).append("\"
 ");
+    }
+  }
+
+  protected void appendCgroupsCmds(StringBuilder cmdLine) {
+    if (cfg.getNodeManagerConfiguration().getCgroups().or(Boolean.FALSE)) {
+      cmdLine.append(" export TASK_DIR=`basename $PWD`;");
+      cmdLine.append(" chmod +x /sys/fs/cgroup/cpu/mesos/$TASK_DIR;");
+    }
+  }
+
+  protected void appendYarnHomeExport(StringBuilder cmdLine) {
+    if (environment.containsKey("YARN_HOME")) {
+      cmdLine.append(" export YARN_HOME=" + environment.get("YARN_HOME") + 
";");
+    }
+  }
+
+  protected void addYarnNodemanagerOpt(String propertyName, String 
propertyValue) {
+    String envOpt = String.format(PROPERTY_FORMAT, propertyName, 
propertyValue);
+    if (environment.containsKey(ENV_YARN_NODEMANAGER_OPTS)) {
+      String existingOpts = environment.get(ENV_YARN_NODEMANAGER_OPTS);
+      environment.put(ENV_YARN_NODEMANAGER_OPTS, existingOpts + " " + envOpt);
+    } else {
+      environment.put(ENV_YARN_NODEMANAGER_OPTS, envOpt);
+    }
+  }
+
+  @Override
+  public String getConfigurationUrl() {
+    YarnConfiguration conf = new YarnConfiguration();
+    String httpPolicy = conf.get(TaskFactory.YARN_HTTP_POLICY);
+    if (httpPolicy != null && 
httpPolicy.equals(TaskFactory.YARN_HTTP_POLICY_HTTPS_ONLY)) {
+      String address = 
conf.get(TaskFactory.YARN_RESOURCEMANAGER_WEBAPP_HTTPS_ADDRESS);
+      if (address == null || address.isEmpty()) {
+        address = conf.get(TaskFactory.YARN_RESOURCEMANAGER_HOSTNAME) + 
":8090";
+      }
+      return "https://"; + address + "/conf";
+    } else {
+      String address = 
conf.get(TaskFactory.YARN_RESOURCEMANAGER_WEBAPP_ADDRESS);
+      if (address == null || address.isEmpty()) {
+        address = conf.get(TaskFactory.YARN_RESOURCEMANAGER_HOSTNAME) + 
":8088";
+      }
+      return "http://"; + address + "/conf";
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/101bcad3/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/NMPorts.java
----------------------------------------------------------------------
diff --git 
a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/NMPorts.java 
b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/NMPorts.java
new file mode 100644
index 0000000..0944a2a
--- /dev/null
+++ b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/NMPorts.java
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.myriad.scheduler;
+
+import com.google.common.base.Preconditions;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Helper class for dynamically assigning ports to nodemanager
+ */
+public class NMPorts implements Ports {
+  private static final String NM_RPC_PORT_KEY = "nm.rpc.port";
+  private static final String NM_LOCALIZER_PORT_KEY = "nm.localizer.port";
+  private static final String NM_WEBAPP_HTTP_PORT_KEY = "nm.webapp.http.port";
+  private static final String NM_HTTP_SHUFFLE_PORT_KEY = 
"nm.http.shuffle.port";
+
+  private static final String[] NM_PORT_KEYS = {NM_RPC_PORT_KEY, 
NM_LOCALIZER_PORT_KEY, NM_WEBAPP_HTTP_PORT_KEY, NM_HTTP_SHUFFLE_PORT_KEY};
+
+  private Map<String, Long> portsMap = new HashMap<>(NM_PORT_KEYS.length);
+
+  public NMPorts(Long[] ports) {
+    Preconditions.checkState(ports.length == NM_PORT_KEYS.length, "NMPorts: 
array \"ports\" is of unexpected length");
+    for (int i = 0; i < NM_PORT_KEYS.length; i++) {
+      portsMap.put(NM_PORT_KEYS[i], ports[i]);
+    }
+  }
+
+  public long getRpcPort() {
+    return portsMap.get(NM_RPC_PORT_KEY);
+  }
+
+  public long getLocalizerPort() {
+    return portsMap.get(NM_LOCALIZER_PORT_KEY);
+  }
+
+  public long getWebAppHttpPort() {
+    return portsMap.get(NM_WEBAPP_HTTP_PORT_KEY);
+  }
+
+  public long getShufflePort() {
+    return portsMap.get(NM_HTTP_SHUFFLE_PORT_KEY);
+  }
+
+  public static int expectedNumPorts() {
+    return NM_PORT_KEYS.length;
+  }
+
+  /**
+   * @return a string representation of NMPorts
+   */
+  @Override
+  public String toString() {
+    StringBuilder sb = new StringBuilder().append("{");
+    for (String key : NM_PORT_KEYS) {
+      sb.append(key).append(": 
").append(portsMap.get(key).toString()).append(", ");
+    }
+    sb.replace(sb.length() - 2, sb.length(), "}");
+    return sb.toString();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/101bcad3/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/NMProfile.java
----------------------------------------------------------------------
diff --git 
a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/NMProfile.java 
b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/NMProfile.java
new file mode 100644
index 0000000..3de82a5
--- /dev/null
+++ b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/NMProfile.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.myriad.scheduler;
+
+import com.google.gson.Gson;
+
+/**
+ * Node Manager Profile
+ */
+public class NMProfile {
+  private String name;
+
+  /**
+   * Number of CPU advertised to YARN Resource Manager.
+   */
+  private Long cpus;
+
+  /**
+   * Memory in MB advertised to YARN Resource Manager.
+   */
+  private Long memory;
+
+  public NMProfile(String name, Long cpus, Long memory) {
+    this.name = name;
+    this.cpus = cpus;
+    this.memory = memory;
+  }
+
+  public String getName() {
+    return name;
+  }
+
+  public Long getCpus() {
+    return cpus;
+  }
+
+  public Long getMemory() {
+    return memory;
+  }
+
+  @Override
+  public String toString() {
+    Gson gson = new Gson();
+    return gson.toJson(this);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/101bcad3/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/NMProfileManager.java
----------------------------------------------------------------------
diff --git 
a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/NMProfileManager.java
 
b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/NMProfileManager.java
new file mode 100644
index 0000000..38fad1d
--- /dev/null
+++ 
b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/NMProfileManager.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.myriad.scheduler;
+
+import com.google.gson.Gson;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * Node Manager Profile Manager
+ */
+public class NMProfileManager {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(NMProfileManager.class);
+
+  private Map<String, NMProfile> profiles = new ConcurrentHashMap<>();
+
+  public NMProfile get(String name) {
+    return profiles.get(name);
+  }
+
+  public void add(NMProfile profile) {
+    LOGGER.info("Adding profile {} with CPU: {} and Memory: {}", 
profile.getName(), profile.getCpus(), profile.getMemory());
+
+    profiles.put(profile.getName(), profile);
+  }
+
+  public boolean exists(String name) {
+    return this.profiles.containsKey(name);
+  }
+
+  public String toString() {
+    Gson gson = new Gson();
+    return gson.toJson(this);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/101bcad3/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/NMTaskFactoryAnnotation.java
----------------------------------------------------------------------
diff --git 
a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/NMTaskFactoryAnnotation.java
 
b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/NMTaskFactoryAnnotation.java
new file mode 100644
index 0000000..62d9f52
--- /dev/null
+++ 
b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/NMTaskFactoryAnnotation.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.myriad.scheduler;
+
+import com.google.inject.BindingAnnotation;
+import java.lang.annotation.Target;
+import java.lang.annotation.Retention;
+import static java.lang.annotation.RetentionPolicy.RUNTIME;
+import static java.lang.annotation.ElementType.PARAMETER;
+import static java.lang.annotation.ElementType.FIELD;
+import static java.lang.annotation.ElementType.METHOD;
+
+/**
+ * NMTaskFactory annotation that allows to bind TaskFactory to NM specific 
implementation
+ */
+@BindingAnnotation
+@Target({FIELD, PARAMETER, METHOD})
+@Retention(RUNTIME)
+public @interface NMTaskFactoryAnnotation {
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/101bcad3/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/Ports.java
----------------------------------------------------------------------
diff --git 
a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/Ports.java 
b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/Ports.java
new file mode 100644
index 0000000..03150fb
--- /dev/null
+++ b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/Ports.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.myriad.scheduler;
+
+/**
+ * Generic interface to represent ports
+ */
+public interface Ports {
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/101bcad3/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/Rebalancer.java
----------------------------------------------------------------------
diff --git 
a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/Rebalancer.java 
b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/Rebalancer.java
new file mode 100644
index 0000000..955bc77
--- /dev/null
+++ b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/Rebalancer.java
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.myriad.scheduler;
+
+import javax.inject.Inject;
+import java.util.Set;
+
+import org.apache.mesos.Protos;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * {@link Rebalancer} is responsible for scaling registered YARN clusters as 
per
+ * configured rules and policies.
+ */
+public class Rebalancer implements Runnable {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(Rebalancer.class);
+
+  private final org.apache.myriad.state.SchedulerState schedulerState;
+  private final MyriadOperations myriadOperations;
+  private final ServiceProfileManager profileManager;
+
+  @Inject
+  public Rebalancer(org.apache.myriad.state.SchedulerState schedulerState, 
MyriadOperations myriadOperations, ServiceProfileManager profileManager) {
+    this.schedulerState = schedulerState;
+    this.myriadOperations = myriadOperations;
+    this.profileManager = profileManager;
+  }
+
+  @Override
+  public void run() {
+    final Set<Protos.TaskID> activeIds = 
schedulerState.getActiveTaskIds(org.apache.myriad.configuration.NodeManagerConfiguration.NM_TASK_PREFIX);
+    final Set<Protos.TaskID> pendingIds = 
schedulerState.getPendingTaskIds(org.apache.myriad.configuration.NodeManagerConfiguration.NM_TASK_PREFIX);
+    LOGGER.info("Active {}, Pending {}", activeIds.size(), pendingIds.size());
+    if (activeIds.size() < 1 && pendingIds.size() < 1) {
+      myriadOperations.flexUpCluster(profileManager.get("small"), 1, null);
+    }
+    //            RestAdapter restAdapter = new RestAdapter.Builder()
+    //                    .setEndpoint("http://"; + host + ":" + port)
+    //                    .setLogLevel(LogLevel.FULL).build();
+    //            YARNResourceManagerService service = restAdapter
+    //                    .create(YARNResourceManagerService.class);
+    //
+    //            ClusterMetrics metrics = 
service.metrics().getClusterMetrics();
+    //            AppsResponse appsResponse = service.apps("ACCEPTED");
+    //
+    //            int acceptedApps = 0;
+    //
+    //            if (appsResponse == null || appsResponse.getApps() == null
+    //                    || appsResponse.getApps().getApps() == null) {
+    //                acceptedApps = 0;
+    //            } else {
+    //                acceptedApps = appsResponse.getApps().getApps().size();
+    //            }
+    //            LOGGER.info("Metrics: {}", metrics);
+    //            LOGGER.info("Apps: {}", appsResponse);
+    //
+    //            long availableMB = metrics.getAvailableMB();
+    //            long allocatedMB = metrics.getAllocatedMB();
+    //            long reservedMB = metrics.getReservedMB();
+    //            int activeNodes = metrics.getActiveNodes();
+    //            int unhealthyNodes = metrics.getUnhealthyNodes();
+    //            int appsPending = metrics.getAppsPending();
+    //            int appsRunning = metrics.getAppsRunning();
+
+    //            if (activeNodes == 0 && appsPending > 0) {
+    //                LOGGER.info(
+    //                        "Flexing up for condition: activeNodes ({}) == 0 
&& appsPending ({}) > 0",
+    //                        activeNodes, appsPending);
+    //                this.myriadOperations.flexUpCluster(clusterId, 1, 
"small");
+    //            } else if (appsPending == 0 && appsRunning == 0 && 
activeNodes > 0) {
+    //                LOGGER.info(
+    //                        "Flexing down for condition: appsPending ({}) == 
0 && appsRunning ({}) == 0 && activeNodes ({}) > 0",
+    //                        appsPending, appsRunning, activeNodes);
+    //                this.myriadOperations.flexDownCluster(cluster, 1);
+    //            } else if (acceptedApps > 0) {
+    //                LOGGER.info("Flexing up for condition: acceptedApps ({}) 
> 0",
+    //                        acceptedApps);
+    //                this.myriadOperations.flexUpCluster(clusterId, 1, 
"small");
+    //            } else {
+    //                LOGGER.info("Nothing to rebalance");
+    //                this.schedulerState.releaseLock(clusterId);
+    //            }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/101bcad3/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/ReconcileService.java
----------------------------------------------------------------------
diff --git 
a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/ReconcileService.java
 
b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/ReconcileService.java
new file mode 100644
index 0000000..d0c74de
--- /dev/null
+++ 
b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/ReconcileService.java
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.myriad.scheduler;
+
+import org.apache.myriad.configuration.MyriadConfiguration;
+import org.apache.mesos.Protos;
+import org.apache.mesos.SchedulerDriver;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.inject.Inject;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Date;
+
+/**
+ * {@link ReconcileService} is responsible for reconciling tasks with the 
mesos master
+ */
+public class ReconcileService {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(ReconcileService.class);
+
+  public static final long DEFAULT_RECONCILATION_DELAY_MS = 10000;
+  public static final long MAX_RECONCILE_ATTEMPTS = 10;
+
+  private org.apache.myriad.state.SchedulerState state;
+  private MyriadConfiguration cfg;
+  private Date lastReconcileTime;
+
+  @Inject
+  public ReconcileService(org.apache.myriad.state.SchedulerState state, 
MyriadConfiguration cfg) {
+    this.state = state;
+    this.cfg = cfg;
+  }
+
+  public void reconcile(SchedulerDriver driver) {
+    Collection<Protos.TaskStatus> taskStatuses = state.getTaskStatuses();
+
+    if (taskStatuses.size() == 0) {
+      return;
+    }
+    LOGGER.info("Reconciling {} tasks.", taskStatuses.size());
+
+    driver.reconcileTasks(taskStatuses);
+
+    lastReconcileTime = new Date();
+
+    int attempt = 1;
+
+    while (attempt <= MAX_RECONCILE_ATTEMPTS) {
+      try {
+        // TODO(mohit): Using exponential backoff here, maybe backoff strategy 
should be configurable.
+        Thread.sleep(DEFAULT_RECONCILATION_DELAY_MS * attempt);
+      } catch (InterruptedException e) {
+        LOGGER.error("Interrupted", e);
+      }
+      Collection<Protos.TaskStatus> notYetReconciled = new ArrayList<>();
+      for (Protos.TaskStatus status : state.getTaskStatuses()) {
+        if (status.getTimestamp() < lastReconcileTime.getTime()) {
+          notYetReconciled.add(status);
+        }
+      }
+      LOGGER.info("Reconcile attempt {} for {} tasks", attempt, 
notYetReconciled.size());
+      driver.reconcileTasks(notYetReconciled);
+      lastReconcileTime = new Date();
+      attempt++;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/101bcad3/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/SchedulerUtils.java
----------------------------------------------------------------------
diff --git 
a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/SchedulerUtils.java
 
b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/SchedulerUtils.java
new file mode 100644
index 0000000..0cbca37
--- /dev/null
+++ 
b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/SchedulerUtils.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.myriad.scheduler;
+
+import org.apache.myriad.configuration.NodeManagerConfiguration;
+import org.apache.myriad.state.NodeTask;
+import org.apache.myriad.state.SchedulerState;
+import com.google.common.base.Preconditions;
+
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.mesos.Protos;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collection;
+
+/**
+ * Provides utilities for scheduling with the mesos offers
+ */
+public class SchedulerUtils {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(SchedulerUtils.class);
+
+  public static boolean isUniqueHostname(Protos.OfferOrBuilder offer, NodeTask 
taskToLaunch, Collection<NodeTask> tasks) {
+    Preconditions.checkArgument(offer != null);
+    String offerHostname = offer.getHostname();
+
+    if (!CollectionUtils.isEmpty(tasks)) {
+      for (NodeTask task : tasks) {
+        if (offerHostname.equalsIgnoreCase(task.getHostname())) {
+          LOGGER.debug("Offer's hostname {} is not unique", offerHostname);
+          return false;
+        }
+      }
+    }
+    LOGGER.debug("Offer's hostname {} is unique", offerHostname);
+    return true;
+  }
+
+  /**
+   * Determines if a given host has a nodemanager running with zero profile. 
Node Managers
+   * launched with zero profile (zero cpu & memory) are eligible for fine 
grained scaling.
+   * Node Managers launched with a non-zero profile size are not eligible for 
fine grained scaling.
+   *
+   * @param hostName
+   * @return
+   */
+  public static boolean isEligibleForFineGrainedScaling(String hostName, 
SchedulerState state) {
+    for (NodeTask activeNMTask : 
state.getActiveTasksByType(NodeManagerConfiguration.NM_TASK_PREFIX)) {
+      if (activeNMTask.getProfile().getCpus() == 0 &&
+          activeNMTask.getProfile().getMemory() == 0 &&
+          activeNMTask.getHostname().equals(hostName)) {
+        return true;
+      }
+    }
+    return false;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/101bcad3/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/ServiceCommandLineGenerator.java
----------------------------------------------------------------------
diff --git 
a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/ServiceCommandLineGenerator.java
 
b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/ServiceCommandLineGenerator.java
new file mode 100644
index 0000000..6fd8872
--- /dev/null
+++ 
b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/ServiceCommandLineGenerator.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.myriad.scheduler;
+
+import org.apache.myriad.configuration.MyriadConfiguration;
+
+/**
+ * CommandLineGenerator for any aux service launched by Myriad as binary distro
+ */
+public class ServiceCommandLineGenerator extends DownloadNMExecutorCLGenImpl {
+
+
+  public ServiceCommandLineGenerator(MyriadConfiguration cfg, String 
nodeManagerUri) {
+    super(cfg, nodeManagerUri);
+  }
+
+  @Override
+  public String generateCommandLine(ServiceResourceProfile profile, Ports 
ports) {
+    StringBuilder strB = new StringBuilder();
+    appendDistroExtractionCommands(strB);
+    appendUser(strB);
+    return strB.toString();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/101bcad3/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/ServiceProfileManager.java
----------------------------------------------------------------------
diff --git 
a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/ServiceProfileManager.java
 
b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/ServiceProfileManager.java
new file mode 100644
index 0000000..5319593
--- /dev/null
+++ 
b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/ServiceProfileManager.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.myriad.scheduler;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.gson.Gson;
+
+/**
+ * Class to keep all the ServiceResourceProfiles together
+ */
+public class ServiceProfileManager {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(ServiceProfileManager.class);
+
+  private Map<String, ServiceResourceProfile> profiles = new 
ConcurrentHashMap<>();
+
+  public ServiceResourceProfile get(String name) {
+    return profiles.get(name);
+  }
+
+  public void add(ServiceResourceProfile profile) {
+    LOGGER.info("Adding profile {} with CPU: {} and Memory: {}", 
profile.getName(), profile.getCpus(), profile.getMemory());
+    profiles.put(profile.getName(), profile);
+  }
+
+  public boolean exists(String name) {
+    return this.profiles.containsKey(name);
+  }
+
+  public String toString() {
+    Gson gson = new Gson();
+    return gson.toJson(this);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/101bcad3/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/ServiceResourceProfile.java
----------------------------------------------------------------------
diff --git 
a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/ServiceResourceProfile.java
 
b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/ServiceResourceProfile.java
new file mode 100644
index 0000000..021007b
--- /dev/null
+++ 
b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/ServiceResourceProfile.java
@@ -0,0 +1,131 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.myriad.scheduler;
+
+import java.lang.reflect.Type;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.gson.Gson;
+import com.google.gson.JsonDeserializationContext;
+import com.google.gson.JsonDeserializer;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonParseException;
+
+/**
+ * Resource Profile for any service
+ */
+public class ServiceResourceProfile {
+
+  protected final String name;
+
+  /**
+   * Number of CPU needed to run a service
+   */
+  protected final Double cpus;
+
+  /**
+   * Memory in MB needed to run a service
+   */
+  protected final Double memory;
+
+  protected Double executorCpu = 0.0;
+
+  protected Double executorMemory = 0.0;
+
+  protected String className;
+
+  public ServiceResourceProfile(String name, Double cpu, Double mem) {
+    this.name = name;
+    this.cpus = cpu;
+    this.memory = mem;
+    this.className = ServiceResourceProfile.class.getName();
+  }
+
+
+  public String getName() {
+    return name;
+  }
+
+  public Double getCpus() {
+    return cpus;
+  }
+
+  public Double getMemory() {
+    return memory;
+  }
+
+  public Double getAggregateMemory() {
+    return memory;
+  }
+
+  public Double getAggregateCpu() {
+    return cpus;
+  }
+
+  public Double getExecutorCpu() {
+    return executorCpu;
+  }
+
+  public void setExecutorCpu(Double executorCpu) {
+    this.executorCpu = executorCpu;
+  }
+
+  public Double getExecutorMemory() {
+    return executorMemory;
+  }
+
+  public void setExecutorMemory(Double executorMemory) {
+    this.executorMemory = executorMemory;
+  }
+
+
+  @Override
+  public String toString() {
+    Gson gson = new Gson();
+    return gson.toJson(this);
+  }
+
+  /**
+   * Custom serializer to help with deserialization of class hierarchy
+   * since at the point of deserialization we don't know class of the data
+   * that is being deserialized
+   */
+  public static class CustomDeserializer implements 
JsonDeserializer<ServiceResourceProfile> {
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(CustomDeserializer.class);
+
+    @Override
+    public ServiceResourceProfile deserialize(JsonElement json, Type typeOfT, 
JsonDeserializationContext context) throws JsonParseException {
+      String type = json.getAsJsonObject().get("className").getAsString();
+      try {
+        @SuppressWarnings("rawtypes") Class c = Class.forName(type);
+        if (ServiceResourceProfile.class.equals(c)) {
+          return new Gson().fromJson(json, typeOfT);
+        }
+        ServiceResourceProfile profile = context.deserialize(json, c);
+        return profile;
+      } catch (ClassNotFoundException e) {
+        LOGGER.error("Classname is not found", e);
+      }
+      return null;
+    }
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/101bcad3/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/ServiceTaskConstraints.java
----------------------------------------------------------------------
diff --git 
a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/ServiceTaskConstraints.java
 
b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/ServiceTaskConstraints.java
new file mode 100644
index 0000000..544c47f
--- /dev/null
+++ 
b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/ServiceTaskConstraints.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.myriad.scheduler;
+
+import java.util.Map;
+
+import org.apache.myriad.configuration.MyriadConfiguration;
+import org.apache.myriad.configuration.ServiceConfiguration;
+
+/**
+ * ServiceTaskConstraints is an implementation of TaskConstraints for a service
+ * at this point constraints are on ports
+ * Later on there may be other types of constraints added
+ */
+public class ServiceTaskConstraints implements TaskConstraints {
+
+  private int portsCount;
+
+  public ServiceTaskConstraints(MyriadConfiguration cfg, String taskPrefix) {
+    this.portsCount = 0;
+    Map<String, ServiceConfiguration> auxConfigs = 
cfg.getServiceConfigurations();
+    if (auxConfigs == null) {
+      return;
+    }
+    ServiceConfiguration serviceConfig = auxConfigs.get(taskPrefix);
+    if (serviceConfig != null) {
+      if (serviceConfig.getPorts().isPresent()) {
+        this.portsCount = serviceConfig.getPorts().get().size();
+      }
+    }
+  }
+
+  @Override
+  public int portsCount() {
+    return portsCount;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/101bcad3/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/ServiceTaskFactoryImpl.java
----------------------------------------------------------------------
diff --git 
a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/ServiceTaskFactoryImpl.java
 
b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/ServiceTaskFactoryImpl.java
new file mode 100644
index 0000000..d7ca31d
--- /dev/null
+++ 
b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/ServiceTaskFactoryImpl.java
@@ -0,0 +1,239 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.myriad.scheduler;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+import javax.inject.Inject;
+
+import org.apache.mesos.Protos.CommandInfo;
+import org.apache.mesos.Protos.ExecutorInfo;
+import org.apache.mesos.Protos.FrameworkID;
+import org.apache.mesos.Protos.Offer;
+import org.apache.mesos.Protos.Resource;
+import org.apache.mesos.Protos.TaskID;
+import org.apache.mesos.Protos.TaskInfo;
+import org.apache.mesos.Protos.Value;
+import org.apache.mesos.Protos.CommandInfo.URI;
+import org.apache.mesos.Protos.Value.Scalar;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.myriad.configuration.MyriadConfiguration;
+import org.apache.myriad.configuration.MyriadExecutorConfiguration;
+import org.apache.myriad.configuration.ServiceConfiguration;
+import org.apache.myriad.state.NodeTask;
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * Generic Service Class that allows to create a service solely base don the 
configuration
+ * Main properties of configuration are:
+ * 1. command to run
+ * 2. Additional env. variables to set (serviceOpts)
+ * 3. ports to use with names of the properties
+ * 4. TODO (yufeldman) executor info
+ */
+public class ServiceTaskFactoryImpl implements TaskFactory {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(ServiceTaskFactoryImpl.class);
+
+  public static final long DEFAULT_PORT_NUMBER = 0;
+
+  private MyriadConfiguration cfg;
+  @SuppressWarnings("unused")
+  private TaskUtils taskUtils;
+  private ServiceCommandLineGenerator clGenerator;
+
+  @Inject
+  public ServiceTaskFactoryImpl(MyriadConfiguration cfg, TaskUtils taskUtils) {
+    this.cfg = cfg;
+    this.taskUtils = taskUtils;
+    this.clGenerator = new ServiceCommandLineGenerator(cfg, 
cfg.getMyriadExecutorConfiguration().getNodeManagerUri().orNull());
+  }
+
+  @Override
+  public TaskInfo createTask(Offer offer, FrameworkID frameworkId, TaskID 
taskId, NodeTask nodeTask) {
+    Objects.requireNonNull(offer, "Offer should be non-null");
+    Objects.requireNonNull(nodeTask, "NodeTask should be non-null");
+
+    ServiceConfiguration serviceConfig = 
cfg.getServiceConfiguration(nodeTask.getTaskPrefix());
+
+    Objects.requireNonNull(serviceConfig, "ServiceConfig should be non-null");
+    Objects.requireNonNull(serviceConfig.getCommand().orNull(), "command for 
ServiceConfig should be non-null");
+
+    final String serviceHostName = "0.0.0.0";
+    final String serviceEnv = serviceConfig.getEnvSettings();
+    final String rmHostName = 
System.getProperty(YARN_RESOURCEMANAGER_HOSTNAME);
+    List<Long> additionalPortsNumbers = null;
+
+    final StringBuilder strB = new StringBuilder("env ");
+    if (serviceConfig.getServiceOpts() != null) {
+      strB.append(serviceConfig.getServiceOpts()).append("=");
+
+      strB.append("\"");
+      if (rmHostName != null && !rmHostName.isEmpty()) {
+        strB.append("-D" + YARN_RESOURCEMANAGER_HOSTNAME + "=" + rmHostName + 
" ");
+      }
+
+      Map<String, Long> ports = serviceConfig.getPorts().orNull();
+      if (ports != null && !ports.isEmpty()) {
+        int neededPortsCount = 0;
+        for (Map.Entry<String, Long> portEntry : ports.entrySet()) {
+          Long port = portEntry.getValue();
+          if (port == DEFAULT_PORT_NUMBER) {
+            neededPortsCount++;
+          }
+        }
+        // use provided ports
+        additionalPortsNumbers = getAvailablePorts(offer, neededPortsCount);
+        LOGGER.info("No specified ports found or number of specified ports is 
not enough. Using ports from Mesos Offers: {}", additionalPortsNumbers);
+        int index = 0;
+        for (Map.Entry<String, Long> portEntry : ports.entrySet()) {
+          String portProperty = portEntry.getKey();
+          Long port = portEntry.getValue();
+          if (port == DEFAULT_PORT_NUMBER) {
+            port = additionalPortsNumbers.get(index++);
+          }
+          strB.append("-D" + portProperty + "=" + serviceHostName + ":" + port 
+ " ");
+        }
+      }
+      strB.append(serviceEnv);
+      strB.append("\"");
+    }
+
+    strB.append(" ");
+    strB.append(serviceConfig.getCommand().get());
+
+    CommandInfo commandInfo = createCommandInfo(nodeTask.getProfile(), 
strB.toString());
+
+    LOGGER.info("Command line for service: {} is: {}", 
nodeTask.getTaskPrefix(), strB.toString());
+
+    Scalar taskMemory = 
Scalar.newBuilder().setValue(nodeTask.getProfile().getMemory()).build();
+    Scalar taskCpus = 
Scalar.newBuilder().setValue(nodeTask.getProfile().getCpus()).build();
+
+    TaskInfo.Builder taskBuilder = TaskInfo.newBuilder();
+
+    
taskBuilder.setName(nodeTask.getTaskPrefix()).setTaskId(taskId).setSlaveId(offer.getSlaveId()).addResources(Resource.newBuilder().setName("cpus").setType(Value.Type.SCALAR).setScalar(taskCpus).build()).addResources(Resource.newBuilder()
+        
.setName("mem").setType(Value.Type.SCALAR).setScalar(taskMemory).build());
+
+    if (additionalPortsNumbers != null && !additionalPortsNumbers.isEmpty()) {
+      // set ports
+      Value.Ranges.Builder valueRanger = Value.Ranges.newBuilder();
+      for (Long port : additionalPortsNumbers) {
+        
valueRanger.addRange(Value.Range.newBuilder().setBegin(port).setEnd(port));
+      }
+
+      
taskBuilder.addResources(Resource.newBuilder().setName("ports").setType(Value.Type.RANGES).setRanges(valueRanger.build()));
+    }
+    taskBuilder.setCommand(commandInfo);
+    return taskBuilder.build();
+  }
+
+  @VisibleForTesting
+  CommandInfo createCommandInfo(ServiceResourceProfile profile, String 
executorCmd) {
+    MyriadExecutorConfiguration myriadExecutorConfiguration = 
cfg.getMyriadExecutorConfiguration();
+    CommandInfo.Builder commandInfo = CommandInfo.newBuilder();
+    Map<String, String> envVars = cfg.getYarnEnvironment();
+    if (envVars != null && !envVars.isEmpty()) {
+      org.apache.mesos.Protos.Environment.Builder yarnHomeB = 
org.apache.mesos.Protos.Environment.newBuilder();
+      for (Map.Entry<String, String> envEntry : envVars.entrySet()) {
+        org.apache.mesos.Protos.Environment.Variable.Builder yarnEnvB = 
org.apache.mesos.Protos.Environment.Variable.newBuilder();
+        yarnEnvB.setName(envEntry.getKey()).setValue(envEntry.getValue());
+        yarnHomeB.addVariables(yarnEnvB.build());
+      }
+      commandInfo.mergeEnvironment(yarnHomeB.build());
+    }
+
+    if (myriadExecutorConfiguration.getNodeManagerUri().isPresent()) {
+      //Both FrameworkUser and FrameworkSuperuser to get all of the directory 
permissions correct.
+      if (!(cfg.getFrameworkUser().isPresent() && 
cfg.getFrameworkSuperUser().isPresent())) {
+        throw new RuntimeException("Trying to use remote distribution, but 
frameworkUser" + "and/or frameworkSuperUser not set!");
+      }
+
+      LOGGER.info("Using remote distribution");
+      String clGeneratedCommand = clGenerator.generateCommandLine(profile, 
null);
+
+      String nmURIString = 
myriadExecutorConfiguration.getNodeManagerUri().get();
+
+      //Concatenate all the subcommands
+      String cmd = clGeneratedCommand + " " + executorCmd;
+
+      //get the nodemanagerURI
+      //We're going to extract ourselves, so setExtract is false
+      LOGGER.info("Getting Hadoop distribution from:" + nmURIString);
+      URI nmUri = 
URI.newBuilder().setValue(nmURIString).setExtract(false).build();
+
+      //get configs directly from resource manager
+      String configUrlString = clGenerator.getConfigurationUrl();
+      LOGGER.info("Getting config from:" + configUrlString);
+      URI configUri = URI.newBuilder().setValue(configUrlString).build();
+
+      LOGGER.info("Slave will execute command:" + cmd);
+      commandInfo.addUris(nmUri).addUris(configUri).setValue("echo \"" + cmd + 
"\";" + cmd);
+      commandInfo.setUser(cfg.getFrameworkSuperUser().get());
+
+    } else {
+      commandInfo.setValue(executorCmd);
+    }
+    return commandInfo.build();
+  }
+
+  @Override
+  public ExecutorInfo getExecutorInfoForSlave(FrameworkID frameworkId, Offer 
offer, CommandInfo commandInfo) {
+    // TODO (yufeldman) if executor specified use it , otherwise return null
+    // nothing to implement here, since we are using default slave executor
+    return null;
+  }
+
+  /**
+   * Helper method to reserve ports
+   *
+   * @param offer
+   * @param requestedPorts
+   * @return
+   */
+  private List<Long> getAvailablePorts(Offer offer, int requestedPorts) {
+    if (requestedPorts == 0) {
+      return null;
+    }
+    final List<Long> returnedPorts = new ArrayList<>();
+    for (Resource resource : offer.getResourcesList()) {
+      if (resource.getName().equals("ports")) {
+        Iterator<Value.Range> itr = 
resource.getRanges().getRangeList().iterator();
+        while (itr.hasNext()) {
+          Value.Range range = itr.next();
+          if (range.getBegin() <= range.getEnd()) {
+            long i = range.getBegin();
+            while (i <= range.getEnd() && returnedPorts.size() < 
requestedPorts) {
+              returnedPorts.add(i);
+              i++;
+            }
+            if (returnedPorts.size() >= requestedPorts) {
+              return returnedPorts;
+            }
+          }
+        }
+      }
+    }
+    // this is actually an error condition - we did not have enough ports to 
use
+    return returnedPorts;
+  }
+}


Reply via email to