http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/101bcad3/myriad-scheduler/src/main/java/com/ebay/myriad/state/Cluster.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/main/java/com/ebay/myriad/state/Cluster.java 
b/myriad-scheduler/src/main/java/com/ebay/myriad/state/Cluster.java
deleted file mode 100644
index 687b491..0000000
--- a/myriad-scheduler/src/main/java/com/ebay/myriad/state/Cluster.java
+++ /dev/null
@@ -1,99 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.ebay.myriad.state;
-
-import com.google.gson.Gson;
-
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.UUID;
-
-/**
- * Model which represents the configuration of a cluster
- */
-public class Cluster {
-  private String clusterId;
-  private String clusterName;
-  private Collection<NodeTask> nodes;
-  private String resourceManagerHost;
-  private String resourceManagerPort;
-  private double minQuota;
-
-  public Cluster() {
-    this.clusterId = UUID.randomUUID().toString();
-    this.nodes = new HashSet<>();
-  }
-
-  public String getClusterId() {
-    return clusterId;
-  }
-
-  public String getClusterName() {
-    return clusterName;
-  }
-
-  public void setClusterName(String clusterName) {
-    this.clusterName = clusterName;
-  }
-
-  public Collection<NodeTask> getNodes() {
-    return nodes;
-  }
-
-  public void addNode(NodeTask node) {
-    this.nodes.add(node);
-  }
-
-  public void addNodes(Collection<NodeTask> nodes) {
-    this.nodes.addAll(nodes);
-  }
-
-  public void removeNode(NodeTask task) {
-    this.nodes.remove(task);
-  }
-
-  public String getResourceManagerHost() {
-    return resourceManagerHost;
-  }
-
-  public void setResourceManagerHost(String resourceManagerHost) {
-    this.resourceManagerHost = resourceManagerHost;
-  }
-
-  public String getResourceManagerPort() {
-    return resourceManagerPort;
-  }
-
-  public void setResourceManagerPort(String resourceManagerPort) {
-    this.resourceManagerPort = resourceManagerPort;
-  }
-
-  public double getMinQuota() {
-    return minQuota;
-  }
-
-  public void setMinQuota(double minQuota) {
-    this.minQuota = minQuota;
-  }
-
-  public String toString() {
-    Gson gson = new Gson();
-    return gson.toJson(this);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/101bcad3/myriad-scheduler/src/main/java/com/ebay/myriad/state/MyriadState.java
----------------------------------------------------------------------
diff --git 
a/myriad-scheduler/src/main/java/com/ebay/myriad/state/MyriadState.java 
b/myriad-scheduler/src/main/java/com/ebay/myriad/state/MyriadState.java
deleted file mode 100644
index 5e868f9..0000000
--- a/myriad-scheduler/src/main/java/com/ebay/myriad/state/MyriadState.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.ebay.myriad.state;
-
-import com.google.protobuf.InvalidProtocolBufferException;
-
-import org.apache.mesos.Protos;
-import org.apache.mesos.state.State;
-import org.apache.mesos.state.Variable;
-
-import java.util.concurrent.ExecutionException;
-
-/**
- * Model that represents the state of Myriad
- */
-public class MyriadState {
-  public static final String KEY_FRAMEWORK_ID = "frameworkId";
-
-  private State stateStore;
-
-  public MyriadState(State stateStore) {
-    this.stateStore = stateStore;
-  }
-
-  public Protos.FrameworkID getFrameworkID() throws InterruptedException, 
ExecutionException, InvalidProtocolBufferException {
-    byte[] frameworkId = stateStore.fetch(KEY_FRAMEWORK_ID).get().value();
-
-    if (frameworkId.length > 0) {
-      return Protos.FrameworkID.parseFrom(frameworkId);
-    } else {
-      return null;
-    }
-  }
-
-  public void setFrameworkId(Protos.FrameworkID newFrameworkId) throws 
InterruptedException, ExecutionException {
-    Variable frameworkId = stateStore.fetch(KEY_FRAMEWORK_ID).get();
-    frameworkId = frameworkId.mutate(newFrameworkId.toByteArray());
-    stateStore.store(frameworkId).get();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/101bcad3/myriad-scheduler/src/main/java/com/ebay/myriad/state/MyriadStateStore.java
----------------------------------------------------------------------
diff --git 
a/myriad-scheduler/src/main/java/com/ebay/myriad/state/MyriadStateStore.java 
b/myriad-scheduler/src/main/java/com/ebay/myriad/state/MyriadStateStore.java
deleted file mode 100644
index 99ab327..0000000
--- a/myriad-scheduler/src/main/java/com/ebay/myriad/state/MyriadStateStore.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package com.ebay.myriad.state;
-
-import com.ebay.myriad.state.utils.StoreContext;
-
-/**
- * Interface implemented by all Myriad State Store implementations
- */
-public interface MyriadStateStore {
-
-  StoreContext loadMyriadState() throws Exception;
-
-  void storeMyriadState(StoreContext storeContext) throws Exception;
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/101bcad3/myriad-scheduler/src/main/java/com/ebay/myriad/state/NodeTask.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/main/java/com/ebay/myriad/state/NodeTask.java 
b/myriad-scheduler/src/main/java/com/ebay/myriad/state/NodeTask.java
deleted file mode 100644
index d784092..0000000
--- a/myriad-scheduler/src/main/java/com/ebay/myriad/state/NodeTask.java
+++ /dev/null
@@ -1,121 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.ebay.myriad.state;
-
-import com.ebay.myriad.scheduler.constraints.Constraint;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import java.util.List;
-import com.ebay.myriad.scheduler.ServiceResourceProfile;
-import com.ebay.myriad.scheduler.TaskUtils;
-import com.google.inject.Inject;
-
-import org.apache.mesos.Protos;
-import org.apache.mesos.Protos.Attribute;
-
-/**
- * Represents a task to be launched by the executor
- */
-public class NodeTask {
-  @JsonProperty
-  private String hostname;
-  @JsonProperty
-  private Protos.SlaveID slaveId;
-  @JsonProperty
-  private Protos.TaskStatus taskStatus;
-  @JsonProperty
-  private String taskPrefix;
-  @JsonProperty
-  private ServiceResourceProfile serviceresourceProfile;
-
-  @Inject
-  TaskUtils taskUtils;
-  /**
-   * Mesos executor for this node.
-   */
-  private Protos.ExecutorInfo executorInfo;
-
-  private Constraint constraint;
-  private List<Attribute> slaveAttributes;
-
-  public NodeTask(ServiceResourceProfile profile, Constraint constraint) {
-    this.serviceresourceProfile = profile;
-    this.hostname = "";
-    this.constraint = constraint;
-  }
-
-  public Protos.SlaveID getSlaveId() {
-    return slaveId;
-  }
-
-  public void setSlaveId(Protos.SlaveID slaveId) {
-    this.slaveId = slaveId;
-  }
-
-  public Constraint getConstraint() {
-    return constraint;
-  }
-
-  public String getHostname() {
-    return this.hostname;
-  }
-
-  public void setHostname(String hostname) {
-    this.hostname = hostname;
-  }
-
-  public Protos.TaskStatus getTaskStatus() {
-    return taskStatus;
-  }
-
-  public void setTaskStatus(Protos.TaskStatus taskStatus) {
-    this.taskStatus = taskStatus;
-  }
-
-  public Protos.ExecutorInfo getExecutorInfo() {
-    return executorInfo;
-  }
-
-  public void setExecutorInfo(Protos.ExecutorInfo executorInfo) {
-    this.executorInfo = executorInfo;
-  }
-
-  public void setSlaveAttributes(List<Attribute> slaveAttributes) {
-    this.slaveAttributes = slaveAttributes;
-  }
-
-  public List<Attribute> getSlaveAttributes() {
-    return slaveAttributes;
-  }
-
-  public String getTaskPrefix() {
-    return taskPrefix;
-  }
-
-  public void setTaskPrefix(String taskPrefix) {
-    this.taskPrefix = taskPrefix;
-  }
-
-  public ServiceResourceProfile getProfile() {
-    return serviceresourceProfile;
-  }
-
-  public void setProfile(ServiceResourceProfile serviceresourceProfile) {
-    this.serviceresourceProfile = serviceresourceProfile;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/101bcad3/myriad-scheduler/src/main/java/com/ebay/myriad/state/SchedulerState.java
----------------------------------------------------------------------
diff --git 
a/myriad-scheduler/src/main/java/com/ebay/myriad/state/SchedulerState.java 
b/myriad-scheduler/src/main/java/com/ebay/myriad/state/SchedulerState.java
deleted file mode 100644
index 99a7506..0000000
--- a/myriad-scheduler/src/main/java/com/ebay/myriad/state/SchedulerState.java
+++ /dev/null
@@ -1,549 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.ebay.myriad.state;
-
-import com.ebay.myriad.scheduler.ServiceResourceProfile;
-import com.ebay.myriad.state.utils.StoreContext;
-import com.google.common.collect.Sets;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import java.util.Set;
-import java.util.UUID;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.regex.Pattern;
-
-import org.apache.commons.collections.CollectionUtils;
-import org.apache.mesos.Protos;
-import org.apache.mesos.Protos.SlaveID;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Represents the state of the Myriad scheduler
- */
-public class SchedulerState {
-  private static final Logger LOGGER = 
LoggerFactory.getLogger(SchedulerState.class);
-
-  private static Pattern taskIdPattern = Pattern.compile("\\.");
-
-  private Map<Protos.TaskID, NodeTask> tasks;
-  private Protos.FrameworkID frameworkId;
-  private MyriadStateStore stateStore;
-  private Map<String, SchedulerStateForType> statesForTaskType;
-
-  public SchedulerState(MyriadStateStore stateStore) {
-    this.tasks = new ConcurrentHashMap<>();
-    this.stateStore = stateStore;
-    this.statesForTaskType = new ConcurrentHashMap<>();
-    loadStateStore();
-  }
-
-  /**
-   * Making method synchronized, so if someone tries flexup/down at the same 
time
-   * addNodes and removeTask will not put data into an inconsistent state
-   *
-   * @param nodes
-   */
-  public synchronized void addNodes(Collection<NodeTask> nodes) {
-    if (CollectionUtils.isEmpty(nodes)) {
-      LOGGER.info("No nodes to add");
-      return;
-    }
-    for (NodeTask node : nodes) {
-      Protos.TaskID taskId = 
Protos.TaskID.newBuilder().setValue(String.format("%s.%s.%s", 
node.getTaskPrefix(), node.getProfile().getName(), UUID.randomUUID())).build();
-      addTask(taskId, node);
-      SchedulerStateForType taskState = 
this.statesForTaskType.get(node.getTaskPrefix());
-      LOGGER.info("Marked taskId {} pending, size of pending queue for {} is: 
{}", taskId.getValue(), node.getTaskPrefix(), (taskState == null ? 0 : 
taskState.getPendingTaskIds().size()));
-      makeTaskPending(taskId);
-    }
-
-  }
-
-  // TODO (sdaingade) Clone NodeTask
-  public synchronized void addTask(Protos.TaskID taskId, NodeTask node) {
-    this.tasks.put(taskId, node);
-    updateStateStore();
-  }
-
-  public synchronized void updateTask(Protos.TaskStatus taskStatus) {
-    Objects.requireNonNull(taskStatus, "TaskStatus object shouldn't be null");
-    Protos.TaskID taskId = taskStatus.getTaskId();
-    if (this.tasks.containsKey(taskId)) {
-      this.tasks.get(taskId).setTaskStatus(taskStatus);
-    }
-    updateStateStore();
-  }
-
-  public synchronized void makeTaskPending(Protos.TaskID taskId) {
-    Objects.requireNonNull(taskId, "taskId cannot be empty or null");
-    String taskPrefix = taskIdPattern.split(taskId.getValue())[0];
-    SchedulerStateForType taskTypeState = statesForTaskType.get(taskPrefix);
-    if (taskTypeState == null) {
-      taskTypeState = new SchedulerStateForType(taskPrefix);
-      statesForTaskType.put(taskPrefix, taskTypeState);
-    }
-    taskTypeState.makeTaskPending(taskId);
-    updateStateStore();
-  }
-
-  public synchronized void makeTaskStaging(Protos.TaskID taskId) {
-    Objects.requireNonNull(taskId, "taskId cannot be empty or null");
-    String taskPrefix = taskIdPattern.split(taskId.getValue())[0];
-    SchedulerStateForType taskTypeState = statesForTaskType.get(taskPrefix);
-    if (taskTypeState == null) {
-      taskTypeState = new SchedulerStateForType(taskPrefix);
-      statesForTaskType.put(taskPrefix, taskTypeState);
-    }
-    taskTypeState.makeTaskStaging(taskId);
-    updateStateStore();
-  }
-
-  public synchronized void makeTaskActive(Protos.TaskID taskId) {
-    Objects.requireNonNull(taskId, "taskId cannot be empty or null");
-    String taskPrefix = taskIdPattern.split(taskId.getValue())[0];
-    SchedulerStateForType taskTypeState = statesForTaskType.get(taskPrefix);
-    if (taskTypeState == null) {
-      taskTypeState = new SchedulerStateForType(taskPrefix);
-      statesForTaskType.put(taskPrefix, taskTypeState);
-    }
-    taskTypeState.makeTaskActive(taskId);
-    updateStateStore();
-  }
-
-  public synchronized void makeTaskLost(Protos.TaskID taskId) {
-    Objects.requireNonNull(taskId, "taskId cannot be empty or null");
-    String taskPrefix = taskIdPattern.split(taskId.getValue())[0];
-    SchedulerStateForType taskTypeState = statesForTaskType.get(taskPrefix);
-    if (taskTypeState == null) {
-      taskTypeState = new SchedulerStateForType(taskPrefix);
-      statesForTaskType.put(taskPrefix, taskTypeState);
-    }
-    taskTypeState.makeTaskLost(taskId);
-    updateStateStore();
-  }
-
-  public synchronized void makeTaskKillable(Protos.TaskID taskId) {
-    Objects.requireNonNull(taskId, "taskId cannot be empty or null");
-    String taskPrefix = taskIdPattern.split(taskId.getValue())[0];
-    SchedulerStateForType taskTypeState = statesForTaskType.get(taskPrefix);
-    if (taskTypeState == null) {
-      taskTypeState = new SchedulerStateForType(taskPrefix);
-      statesForTaskType.put(taskPrefix, taskTypeState);
-    }
-    taskTypeState.makeTaskKillable(taskId);
-    updateStateStore();
-  }
-
-  // TODO (sdaingade) Clone NodeTask
-  public synchronized NodeTask getTask(Protos.TaskID taskId) {
-    return this.tasks.get(taskId);
-  }
-
-  public synchronized Set<Protos.TaskID> getKillableTasks() {
-    Set<Protos.TaskID> returnSet = new HashSet<>();
-    for (Map.Entry<String, SchedulerStateForType> entry : 
statesForTaskType.entrySet()) {
-      returnSet.addAll(entry.getValue().getKillableTasks());
-    }
-    return returnSet;
-  }
-
-  public synchronized Set<Protos.TaskID> getKillableTasks(String taskPrefix) {
-    SchedulerStateForType stateTask = statesForTaskType.get(taskPrefix);
-    return (stateTask == null ? new HashSet<Protos.TaskID>() : 
stateTask.getKillableTasks());
-  }
-
-  public synchronized void removeTask(Protos.TaskID taskId) {
-    String taskPrefix = taskIdPattern.split(taskId.getValue())[0];
-    SchedulerStateForType taskTypeState = statesForTaskType.get(taskPrefix);
-    if (taskTypeState != null) {
-      taskTypeState.removeTask(taskId);
-    }
-    this.tasks.remove(taskId);
-    updateStateStore();
-  }
-
-  public synchronized Set<Protos.TaskID> getPendingTaskIds() {
-    Set<Protos.TaskID> returnSet = new HashSet<>();
-    for (Map.Entry<String, SchedulerStateForType> entry : 
statesForTaskType.entrySet()) {
-      returnSet.addAll(entry.getValue().getPendingTaskIds());
-    }
-    return returnSet;
-  }
-
-  public synchronized Collection<Protos.TaskID> 
getPendingTaskIDsForProfile(ServiceResourceProfile serviceProfile) {
-    List<Protos.TaskID> pendingTaskIds = new ArrayList<>();
-    Set<Protos.TaskID> pendingTasks = getPendingTaskIds();
-    for (Map.Entry<Protos.TaskID, NodeTask> entry : tasks.entrySet()) {
-      NodeTask nodeTask = entry.getValue();
-      if (pendingTasks.contains(entry.getKey()) && 
nodeTask.getProfile().getName().equals(serviceProfile.getName())) {
-        pendingTaskIds.add(entry.getKey());
-      }
-    }
-    return Collections.unmodifiableCollection(pendingTaskIds);
-  }
-
-  public synchronized Set<Protos.TaskID> getPendingTaskIds(String taskPrefix) {
-    SchedulerStateForType stateTask = statesForTaskType.get(taskPrefix);
-    return (stateTask == null ? new HashSet<Protos.TaskID>() : 
stateTask.getPendingTaskIds());
-  }
-
-  public synchronized Set<Protos.TaskID> getActiveTaskIds() {
-    Set<Protos.TaskID> returnSet = new HashSet<>();
-    for (Map.Entry<String, SchedulerStateForType> entry : 
statesForTaskType.entrySet()) {
-      returnSet.addAll(entry.getValue().getActiveTaskIds());
-    }
-    return returnSet;
-  }
-
-  public synchronized Set<Protos.TaskID> getActiveTaskIds(String taskPrefix) {
-    SchedulerStateForType stateTask = statesForTaskType.get(taskPrefix);
-    return (stateTask == null ? new HashSet<Protos.TaskID>() : 
stateTask.getActiveTaskIds());
-  }
-
-  public synchronized Set<NodeTask> getActiveTasks() {
-    return getTasks(getActiveTaskIds());
-  }
-
-  public Set<NodeTask> getActiveTasksByType(String taskPrefix) {
-    return getTasks(getActiveTaskIds(taskPrefix));
-  }
-
-  public Set<NodeTask> getStagingTasks() {
-    return getTasks(getStagingTaskIds());
-  }
-
-  public Set<NodeTask> getStagingTasksByType(String taskPrefix) {
-    return getTasks(getStagingTaskIds(taskPrefix));
-  }
-
-  public Set<NodeTask> getPendingTasksByType(String taskPrefix) {
-    return getTasks(getPendingTaskIds(taskPrefix));
-  }
-
-  public synchronized Set<NodeTask> getTasks(Set<Protos.TaskID> taskIds) {
-    Set<NodeTask> nodeTasks = new HashSet<>();
-    if (CollectionUtils.isNotEmpty(taskIds) && 
CollectionUtils.isNotEmpty(tasks.values())) {
-      for (Map.Entry<Protos.TaskID, NodeTask> entry : tasks.entrySet()) {
-        if (taskIds.contains(entry.getKey())) {
-          nodeTasks.add(entry.getValue());
-        }
-      }
-    }
-    return Collections.unmodifiableSet(nodeTasks);
-  }
-
-  public synchronized Collection<Protos.TaskID> 
getActiveTaskIDsForProfile(ServiceResourceProfile serviceProfile) {
-    List<Protos.TaskID> activeTaskIDs = new ArrayList<>();
-    Set<Protos.TaskID> activeTaskIds = getActiveTaskIds();
-    if (CollectionUtils.isNotEmpty(activeTaskIds) && 
CollectionUtils.isNotEmpty(tasks.values())) {
-      for (Map.Entry<Protos.TaskID, NodeTask> entry : tasks.entrySet()) {
-        NodeTask nodeTask = entry.getValue();
-        if (activeTaskIds.contains(entry.getKey()) && 
nodeTask.getProfile().getName().equals(serviceProfile.getName())) {
-          activeTaskIDs.add(entry.getKey());
-        }
-      }
-    }
-    return Collections.unmodifiableCollection(activeTaskIDs);
-  }
-
-  // TODO (sdaingade) Clone NodeTask
-  public synchronized NodeTask getNodeTask(SlaveID slaveId, String taskPrefix) 
{
-    if (taskPrefix == null) {
-      return null;
-    }
-    for (Map.Entry<Protos.TaskID, NodeTask> entry : tasks.entrySet()) {
-      final NodeTask task = entry.getValue();
-      if (task.getSlaveId() != null &&
-          task.getSlaveId().equals(slaveId) &&
-          taskPrefix.equals(task.getTaskPrefix())) {
-        return entry.getValue();
-      }
-    }
-    return null;
-  }
-
-  public synchronized Set<NodeTask> getNodeTasks(SlaveID slaveId) {
-    Set<NodeTask> nodeTasks = Sets.newHashSet();
-    for (Map.Entry<Protos.TaskID, NodeTask> entry : tasks.entrySet()) {
-      final NodeTask task = entry.getValue();
-      if (task.getSlaveId() != null && task.getSlaveId().equals(slaveId)) {
-        nodeTasks.add(entry.getValue());
-      }
-    }
-    return nodeTasks;
-  }
-
-  public Set<Protos.TaskID> getStagingTaskIds() {
-    Set<Protos.TaskID> returnSet = new HashSet<>();
-    for (Map.Entry<String, SchedulerStateForType> entry : 
statesForTaskType.entrySet()) {
-      returnSet.addAll(entry.getValue().getStagingTaskIds());
-    }
-    return returnSet;
-  }
-
-  public synchronized Collection<Protos.TaskID> 
getStagingTaskIDsForProfile(ServiceResourceProfile serviceProfile) {
-    List<Protos.TaskID> stagingTaskIDs = new ArrayList<>();
-
-    Set<Protos.TaskID> stagingTasks = getStagingTaskIds();
-    for (Map.Entry<Protos.TaskID, NodeTask> entry : tasks.entrySet()) {
-      NodeTask nodeTask = entry.getValue();
-      if (stagingTasks.contains(entry.getKey()) && 
nodeTask.getProfile().getName().equals(serviceProfile.getName())) {
-        stagingTaskIDs.add(entry.getKey());
-      }
-    }
-    return Collections.unmodifiableCollection(stagingTaskIDs);
-  }
-
-  public Set<Protos.TaskID> getStagingTaskIds(String taskPrefix) {
-    SchedulerStateForType stateTask = statesForTaskType.get(taskPrefix);
-    return (stateTask == null ? new HashSet<Protos.TaskID>() : 
stateTask.getStagingTaskIds());
-  }
-
-  public Set<Protos.TaskID> getLostTaskIds() {
-    Set<Protos.TaskID> returnSet = new HashSet<>();
-    for (Map.Entry<String, SchedulerStateForType> entry : 
statesForTaskType.entrySet()) {
-      returnSet.addAll(entry.getValue().getLostTaskIds());
-    }
-    return returnSet;
-  }
-
-  public Set<Protos.TaskID> getLostTaskIds(String taskPrefix) {
-    SchedulerStateForType stateTask = statesForTaskType.get(taskPrefix);
-    return (stateTask == null ? new HashSet<Protos.TaskID>() : 
stateTask.getLostTaskIds());
-  }
-
-  // TODO (sdaingade) Currently cannot return unmodifiableCollection
-  // as this will break ReconcileService code
-  public synchronized Collection<Protos.TaskStatus> getTaskStatuses() {
-    Collection<Protos.TaskStatus> taskStatuses = new 
ArrayList<>(this.tasks.size());
-    Collection<NodeTask> tasks = this.tasks.values();
-    for (NodeTask task : tasks) {
-      Protos.TaskStatus taskStatus = task.getTaskStatus();
-      if (taskStatus != null) {
-        taskStatuses.add(taskStatus);
-      }
-    }
-
-    return taskStatuses;
-  }
-
-  public synchronized boolean hasTask(Protos.TaskID taskID) {
-    return this.tasks.containsKey(taskID);
-  }
-
-  public synchronized Protos.FrameworkID getFrameworkID() {
-    return this.frameworkId;
-  }
-
-  public synchronized void setFrameworkId(Protos.FrameworkID newFrameworkId) {
-    this.frameworkId = newFrameworkId;
-    updateStateStore();
-  }
-
-  private synchronized void updateStateStore() {
-    if (this.stateStore == null) {
-      LOGGER.debug("Could not update state to state store as HA is disabled");
-      return;
-    }
-
-    try {
-      StoreContext sc = new StoreContext(frameworkId, tasks, 
getPendingTaskIds(), getStagingTaskIds(), getActiveTaskIds(), getLostTaskIds(), 
getKillableTasks());
-      stateStore.storeMyriadState(sc);
-    } catch (Exception e) {
-      LOGGER.error("Failed to update scheduler state to state store", e);
-    }
-  }
-
-  private synchronized void loadStateStore() {
-    if (this.stateStore == null) {
-      LOGGER.debug("Could not load state from state store as HA is disabled");
-      return;
-    }
-
-    try {
-      StoreContext sc = stateStore.loadMyriadState();
-      if (sc != null) {
-        this.frameworkId = sc.getFrameworkId();
-        this.tasks.putAll(sc.getTasks());
-        convertToThis(TaskState.PENDING, sc.getPendingTasks());
-        convertToThis(TaskState.STAGING, sc.getStagingTasks());
-        convertToThis(TaskState.ACTIVE, sc.getActiveTasks());
-        convertToThis(TaskState.LOST, sc.getLostTasks());
-        convertToThis(TaskState.KILLABLE, sc.getKillableTasks());
-        LOGGER.info("Loaded Myriad state from state store successfully.");
-        LOGGER.debug("State Store state includes " +
-            "frameworkId: {}, pending tasks count: {}, staging tasks count: {} 
" +
-            "active tasks count: {}, lost tasks count: {}, " +
-            "and killable tasks count: {}", frameworkId.getValue(), 
this.getPendingTaskIds().size(), this.getStagingTaskIds().size(), 
this.getActiveTaskIds().size(), this.getLostTaskIds().size(), 
this.getKillableTasks().size());
-      }
-    } catch (Exception e) {
-      LOGGER.error("Failed to read scheduler state from state store", e);
-    }
-  }
-
-  private void convertToThis(TaskState taskType, Set<Protos.TaskID> taskIds) {
-    for (Protos.TaskID taskId : taskIds) {
-      String taskPrefix = taskIdPattern.split(taskId.getValue())[0];
-      SchedulerStateForType taskTypeState = statesForTaskType.get(taskPrefix);
-      if (taskTypeState == null) {
-        taskTypeState = new SchedulerStateForType(taskPrefix);
-        statesForTaskType.put(taskPrefix, taskTypeState);
-      }
-      switch (taskType) {
-        case PENDING:
-          taskTypeState.makeTaskPending(taskId);
-          break;
-        case STAGING:
-          taskTypeState.makeTaskStaging(taskId);
-          break;
-        case ACTIVE:
-          taskTypeState.makeTaskActive(taskId);
-          break;
-        case KILLABLE:
-          taskTypeState.makeTaskKillable(taskId);
-          break;
-        case LOST:
-          taskTypeState.makeTaskLost(taskId);
-          break;
-      }
-    }
-  }
-
-  /**
-   * Class to keep all the tasks states for a particular taskPrefix together
-   */
-  private static class SchedulerStateForType {
-
-    private final String taskPrefix;
-    private Set<Protos.TaskID> pendingTasks;
-    private Set<Protos.TaskID> stagingTasks;
-    private Set<Protos.TaskID> activeTasks;
-    private Set<Protos.TaskID> lostTasks;
-    private Set<Protos.TaskID> killableTasks;
-
-    public SchedulerStateForType(String taskPrefix) {
-      this.taskPrefix = taskPrefix;
-      // Since Sets.newConcurrentHashSet is available only starting form Guava 
version 15
-      // and so far (Hadoop 2.7) uses guava 13 we can not easily use it
-      this.pendingTasks = Collections.newSetFromMap(new 
ConcurrentHashMap<Protos.TaskID, Boolean>());
-      this.stagingTasks = Collections.newSetFromMap(new 
ConcurrentHashMap<Protos.TaskID, Boolean>());
-      this.activeTasks = Collections.newSetFromMap(new 
ConcurrentHashMap<Protos.TaskID, Boolean>());
-      this.lostTasks = Collections.newSetFromMap(new 
ConcurrentHashMap<Protos.TaskID, Boolean>());
-      this.killableTasks = Collections.newSetFromMap(new 
ConcurrentHashMap<Protos.TaskID, Boolean>());
-
-    }
-
-    @SuppressWarnings("unused")
-    public String getTaskPrefix() {
-      return taskPrefix;
-    }
-
-    public synchronized void makeTaskPending(Protos.TaskID taskId) {
-      Objects.requireNonNull(taskId, "taskId cannot be empty or null");
-
-      pendingTasks.add(taskId);
-      stagingTasks.remove(taskId);
-      activeTasks.remove(taskId);
-      lostTasks.remove(taskId);
-      killableTasks.remove(taskId);
-    }
-
-    public synchronized void makeTaskStaging(Protos.TaskID taskId) {
-      Objects.requireNonNull(taskId, "taskId cannot be empty or null");
-      pendingTasks.remove(taskId);
-      stagingTasks.add(taskId);
-      activeTasks.remove(taskId);
-      lostTasks.remove(taskId);
-      killableTasks.remove(taskId);
-    }
-
-    public synchronized void makeTaskActive(Protos.TaskID taskId) {
-      Objects.requireNonNull(taskId, "taskId cannot be empty or null");
-      pendingTasks.remove(taskId);
-      stagingTasks.remove(taskId);
-      activeTasks.add(taskId);
-      lostTasks.remove(taskId);
-      killableTasks.remove(taskId);
-    }
-
-    public synchronized void makeTaskLost(Protos.TaskID taskId) {
-      Objects.requireNonNull(taskId, "taskId cannot be empty or null");
-      pendingTasks.remove(taskId);
-      stagingTasks.remove(taskId);
-      activeTasks.remove(taskId);
-      lostTasks.add(taskId);
-      killableTasks.remove(taskId);
-    }
-
-    public synchronized void makeTaskKillable(Protos.TaskID taskId) {
-      Objects.requireNonNull(taskId, "taskId cannot be empty or null");
-      pendingTasks.remove(taskId);
-      stagingTasks.remove(taskId);
-      activeTasks.remove(taskId);
-      lostTasks.remove(taskId);
-      killableTasks.add(taskId);
-    }
-
-    public synchronized void removeTask(Protos.TaskID taskId) {
-      this.pendingTasks.remove(taskId);
-      this.stagingTasks.remove(taskId);
-      this.activeTasks.remove(taskId);
-      this.lostTasks.remove(taskId);
-      this.killableTasks.remove(taskId);
-    }
-
-    public synchronized Set<Protos.TaskID> getPendingTaskIds() {
-      return Collections.unmodifiableSet(this.pendingTasks);
-    }
-
-    public Set<Protos.TaskID> getActiveTaskIds() {
-      return Collections.unmodifiableSet(this.activeTasks);
-    }
-
-    public synchronized Set<Protos.TaskID> getStagingTaskIds() {
-      return Collections.unmodifiableSet(this.stagingTasks);
-    }
-
-    public synchronized Set<Protos.TaskID> getLostTaskIds() {
-      return Collections.unmodifiableSet(this.lostTasks);
-    }
-
-    public synchronized Set<Protos.TaskID> getKillableTasks() {
-      return Collections.unmodifiableSet(this.killableTasks);
-    }
-
-  }
-
-  /**
-   * TaskState type
-   */
-  public enum TaskState {
-    PENDING,
-    STAGING,
-    ACTIVE,
-    KILLABLE,
-    LOST
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/101bcad3/myriad-scheduler/src/main/java/com/ebay/myriad/state/utils/ByteBufferSupport.java
----------------------------------------------------------------------
diff --git 
a/myriad-scheduler/src/main/java/com/ebay/myriad/state/utils/ByteBufferSupport.java
 
b/myriad-scheduler/src/main/java/com/ebay/myriad/state/utils/ByteBufferSupport.java
deleted file mode 100644
index fcf2cb8..0000000
--- 
a/myriad-scheduler/src/main/java/com/ebay/myriad/state/utils/ByteBufferSupport.java
+++ /dev/null
@@ -1,368 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package com.ebay.myriad.state.utils;
-
-import com.ebay.myriad.scheduler.constraints.Constraint;
-import com.ebay.myriad.scheduler.constraints.Constraint.Type;
-import com.ebay.myriad.scheduler.constraints.LikeConstraint;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.ByteOrder;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.commons.lang3.StringUtils;
-import org.apache.mesos.Protos;
-
-import com.ebay.myriad.scheduler.ServiceResourceProfile;
-import com.ebay.myriad.state.NodeTask;
-import com.google.gson.Gson;
-import com.google.gson.GsonBuilder;
-import com.google.protobuf.GeneratedMessage;
-
-/**
- * ByteBuffer support for the Serialization of the StoreContext
- */
-public class ByteBufferSupport {
-
-  public static final int INT_SIZE = Integer.SIZE / Byte.SIZE;
-  public static final String UTF8 = "UTF-8";
-  public static final byte[] ZERO_BYTES = new byte[0];
-  private static Gson gson = new Gson();
-  private static Gson gsonCustom = new 
GsonBuilder().registerTypeAdapter(ServiceResourceProfile.class, new 
ServiceResourceProfile.CustomDeserializer()).create();
-
-  public static void addByteBuffers(List<ByteBuffer> list, 
ByteArrayOutputStream bytes) throws IOException {
-    // If list, add the list size, then the size of each buffer followed by 
the buffer.
-    if (list != null) {
-      bytes.write(toIntBytes(list.size()));
-      for (ByteBuffer bb : list) {
-        addByteBuffer(bb, bytes);
-      }
-    } else {
-      bytes.write(toIntBytes(0));
-    }
-  }
-
-  public static void addByteBuffer(ByteBuffer bb, ByteArrayOutputStream bytes) 
throws IOException {
-    if (bb != null && bytes != null) {
-      bytes.write(toIntBytes(bb.array().length));
-      bytes.write(bb.array());
-    }
-  }
-
-  public static ByteBuffer toByteBuffer(Protos.TaskID taskId) {
-    return toBuffer(taskId);
-  }
-
-  public static ByteBuffer toByteBuffer(Protos.FrameworkID frameworkId) {
-    return toBuffer(frameworkId);
-  }
-
-  /*
-   * Common method to convert Protobuf object to ByteBuffer 
-   */
-  public static ByteBuffer toBuffer(GeneratedMessage message) {
-    byte dst[];
-    int size;
-    if (message != null) {
-      size = message.getSerializedSize() + INT_SIZE;
-      dst = message.toByteArray();
-    } else {
-      size = INT_SIZE;
-      dst = ZERO_BYTES;
-    }
-    ByteBuffer bb = createBuffer(size);
-    putBytes(bb, dst);
-    bb.rewind();
-    return bb;
-  }
-
-  public static byte[] toIntBytes(int src) {
-    ByteBuffer bb = createBuffer(INT_SIZE);
-    bb.putInt(src);
-    return bb.array();
-  }
-
-
-  public static ByteBuffer toByteBuffer(NodeTask nt) {
-    // Determine the size of ByteBuffer to allocate
-    // The ServiceResourceProfile toString() returns Json, if this ever 
changes then this
-    // will fail. Json is expected.
-    byte[] profile = toBytes(nt.getProfile().toString());
-    int size = profile.length + INT_SIZE;
-
-    Constraint constraint = nt.getConstraint();
-    Constraint.Type type = constraint == null ? Type.NULL : 
constraint.getType();
-    size += INT_SIZE;
-
-    byte[] constraintBytes = ZERO_BYTES;
-    if (constraint != null) {
-      constraintBytes = toBytes(constraint.toString());
-      size += constraintBytes.length + INT_SIZE;
-    } else {
-      size += INT_SIZE;
-    }
-
-    byte[] hostname = toBytes(nt.getHostname());
-    size += hostname.length + INT_SIZE;
-
-    if (nt.getSlaveId() != null) {
-      size += nt.getSlaveId().getSerializedSize() + INT_SIZE;
-    } else {
-      size += INT_SIZE;
-    }
-
-    if (nt.getTaskStatus() != null) {
-      size += nt.getTaskStatus().getSerializedSize() + INT_SIZE;
-    } else {
-      size += INT_SIZE;
-    }
-
-    if (nt.getExecutorInfo() != null) {
-      size += nt.getExecutorInfo().getSerializedSize() + INT_SIZE;
-    } else {
-      size += INT_SIZE;
-    }
-
-    byte[] taskPrefixBytes = ZERO_BYTES;
-    if (nt.getTaskPrefix() != null) {
-      taskPrefixBytes = toBytes(nt.getTaskPrefix());
-      size += taskPrefixBytes.length + INT_SIZE;
-    }
-
-    // Allocate and populate the buffer.
-    ByteBuffer bb = createBuffer(size);
-    putBytes(bb, profile);
-    bb.putInt(type.ordinal());
-    putBytes(bb, constraintBytes);
-    putBytes(bb, hostname);
-    putBytes(bb, getSlaveBytes(nt));
-    putBytes(bb, getTaskBytes(nt));
-    putBytes(bb, getExecutorInfoBytes(nt));
-    putBytes(bb, taskPrefixBytes);
-    // Make sure the buffer is at the beginning
-    bb.rewind();
-    return bb;
-  }
-
-  /**
-   * Assumes the entire ByteBuffer is a TaskID.
-   *
-   * @param bb
-   * @return Protos.TaskID
-   */
-  public static Protos.TaskID toTaskId(ByteBuffer bb) {
-    try {
-      return Protos.TaskID.parseFrom(getBytes(bb, bb.getInt()));
-    } catch (Exception e) {
-      throw new RuntimeException("Failed to parse Task ID", e);
-    }
-  }
-
-  /**
-   * Assumes the entire ByteBuffer is a FrameworkID.
-   *
-   * @param bb
-   * @return Protos.FrameworkID
-   */
-  public static Protos.FrameworkID toFrameworkID(ByteBuffer bb) {
-    try {
-      return Protos.FrameworkID.parseFrom(getBytes(bb, bb.getInt()));
-    } catch (Exception e) {
-      throw new RuntimeException("Failed to parse Framework ID", e);
-    }
-  }
-
-  /**
-   * ByteBuffer is expected to have a NodeTask at its next position.
-   *
-   * @param bb
-   * @return NodeTask or null if buffer is empty. Can throw a RuntimeException
-   * if the buffer is not formatted correctly.
-   */
-  public static NodeTask toNodeTask(ByteBuffer bb) {
-    NodeTask nt = null;
-    if (bb != null && bb.array().length > 0) {
-      nt = new NodeTask(getServiceResourceProfile(bb), getConstraint(bb));
-      nt.setHostname(toString(bb));
-      nt.setSlaveId(toSlaveId(bb));
-      nt.setTaskStatus(toTaskStatus(bb));
-      nt.setExecutorInfo(toExecutorInfo(bb));
-    }
-    return nt;
-  }
-
-  public static byte[] getTaskBytes(NodeTask nt) {
-    if (nt.getTaskStatus() != null) {
-      return nt.getTaskStatus().toByteArray();
-    } else {
-      return ZERO_BYTES;
-    }
-  }
-
-  public static byte[] getExecutorInfoBytes(NodeTask nt) {
-    if (nt.getExecutorInfo() != null) {
-      return nt.getExecutorInfo().toByteArray();
-    } else {
-      return ZERO_BYTES;
-    }
-  }
-
-  public static byte[] getSlaveBytes(NodeTask nt) {
-    if (nt.getSlaveId() != null) {
-      return nt.getSlaveId().toByteArray();
-    } else {
-      return ZERO_BYTES;
-    }
-  }
-
-  public static void putBytes(ByteBuffer bb, byte bytes[]) {
-    if (bytes != null && bytes.length > 0) {
-      bb.putInt(bytes.length);
-      bb.put(bytes);
-    } else {
-      bb.putInt(0);
-    }
-  }
-
-  public static byte[] getBytes(ByteBuffer bb, int size) {
-    byte bytes[] = new byte[size];
-    bb.get(bytes);
-    return bytes;
-  }
-
-  /**
-   * This assumes the next position is the size as an int, and the following 
is a string
-   * iff the size is not zero.
-   *
-   * @param bb ByteBuffer to extract string from
-   * @return string from the next position, or "" if the size is zero
-   */
-  public static String toString(ByteBuffer bb) {
-    byte[] bytes = new byte[bb.getInt()];
-    String s = "";
-    try {
-      if (bytes.length > 0) {
-        bb.get(bytes);
-        s = new String(bytes, UTF8);
-      }
-    } catch (Exception e) {
-      throw new RuntimeException("ByteBuffer not in expected format," + " 
failed to parse string bytes", e);
-    }
-    return s;
-  }
-
-  public static byte[] toBytes(String s) {
-    try {
-      return s.getBytes(UTF8);
-    } catch (Exception e) {
-      return ZERO_BYTES;
-    }
-  }
-
-  public static ServiceResourceProfile getServiceResourceProfile(ByteBuffer 
bb) {
-    String p = toString(bb);
-    if (!StringUtils.isEmpty(p)) {
-      return gsonCustom.fromJson(p, ServiceResourceProfile.class);
-    } else {
-      return null;
-    }
-  }
-
-  public static Constraint getConstraint(ByteBuffer bb) {
-    Constraint.Type type = Constraint.Type.values()[bb.getInt()];
-    String p = toString(bb);
-    switch (type) {
-      case NULL:
-        return null;
-
-      case LIKE:
-
-        if (!StringUtils.isEmpty(p)) {
-          return gson.fromJson(p, LikeConstraint.class);
-        }
-    }
-    return null;
-  }
-
-  public static Protos.SlaveID toSlaveId(ByteBuffer bb) {
-    int size = bb.getInt();
-    if (size > 0) {
-      try {
-        return Protos.SlaveID.parseFrom(getBytes(bb, size));
-      } catch (Exception e) {
-        throw new RuntimeException("ByteBuffer not in expected format," + " 
failed to parse SlaveId bytes", e);
-      }
-    } else {
-      return null;
-    }
-  }
-
-  public static Protos.TaskStatus toTaskStatus(ByteBuffer bb) {
-    int size = bb.getInt();
-    if (size > 0) {
-      try {
-        return Protos.TaskStatus.parseFrom(getBytes(bb, size));
-      } catch (Exception e) {
-        throw new RuntimeException("ByteBuffer not in expected format," + " 
failed to parse TaskStatus bytes", e);
-      }
-    } else {
-      return null;
-    }
-  }
-
-  public static Protos.ExecutorInfo toExecutorInfo(ByteBuffer bb) {
-    int size = bb.getInt();
-    if (size > 0) {
-      try {
-        return Protos.ExecutorInfo.parseFrom(getBytes(bb, size));
-      } catch (Exception e) {
-        throw new RuntimeException("ByteBuffer not in expected format," + " 
failed to parse ExecutorInfo bytes", e);
-      }
-    } else {
-      return null;
-    }
-  }
-
-  public static ByteBuffer fillBuffer(byte src[]) {
-    ByteBuffer bb = createBuffer(src.length);
-    bb.put(src);
-    bb.rewind();
-    return bb;
-  }
-
-  public static List<ByteBuffer> createBufferList(ByteBuffer bb, int size) {
-    List<ByteBuffer> list = new ArrayList<ByteBuffer>(size);
-    for (int i = 0; i < size; i++) {
-      list.add(fillBuffer(getBytes(bb, bb.getInt())));
-    }
-    return list;
-  }
-
-  private static ByteBuffer createBuffer(int size) {
-    return ByteBuffer.allocate(size).order(ByteOrder.LITTLE_ENDIAN);
-  }
-
-  public static ByteBuffer createBuffer(ByteBuffer bb) {
-    return fillBuffer(getBytes(bb, bb.getInt()));
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/101bcad3/myriad-scheduler/src/main/java/com/ebay/myriad/state/utils/StoreContext.java
----------------------------------------------------------------------
diff --git 
a/myriad-scheduler/src/main/java/com/ebay/myriad/state/utils/StoreContext.java 
b/myriad-scheduler/src/main/java/com/ebay/myriad/state/utils/StoreContext.java
deleted file mode 100644
index 2ffcaa7..0000000
--- 
a/myriad-scheduler/src/main/java/com/ebay/myriad/state/utils/StoreContext.java
+++ /dev/null
@@ -1,278 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package com.ebay.myriad.state.utils;
-
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.regex.Pattern;
-
-import org.apache.mesos.Protos;
-import org.apache.mesos.Protos.TaskID;
-
-import com.ebay.myriad.state.NodeTask;
-
-/**
- * The purpose of this container/utility is to create a mechanism to serialize 
the SchedulerState
- * to RMStateStore and back. Json did not seem to handle the Protos fields 
very well so this was an
- * alternative approach.
- */
-public final class StoreContext {
-  private static Pattern taskIdPattern = Pattern.compile("\\.");
-  private ByteBuffer frameworkId;
-  private List<ByteBuffer> taskIds;
-  private List<ByteBuffer> taskNodes;
-  private List<ByteBuffer> pendingTasks;
-  private List<ByteBuffer> stagingTasks;
-  private List<ByteBuffer> activeTasks;
-  private List<ByteBuffer> lostTasks;
-  private List<ByteBuffer> killableTasks;
-
-  public StoreContext() {
-  }
-
-  /**
-   * Accept all the SchedulerState maps and flatten them into lists of 
ByteBuffers
-   *
-   * @param tasks
-   * @param pendingTasks
-   * @param stagingTasks
-   * @param activeTasks
-   * @param lostTasks
-   * @param killableTasks
-   */
-  public StoreContext(Protos.FrameworkID frameworkId, Map<Protos.TaskID, 
NodeTask> tasks, Set<Protos.TaskID> pendingTasks, Set<Protos.TaskID> 
stagingTasks, Set<Protos.TaskID> activeTasks, Set<Protos.TaskID> lostTasks, 
Set<Protos.TaskID>
-      killableTasks) {
-    setFrameworkId(frameworkId);
-    setTasks(tasks);
-    setPendingTasks(pendingTasks);
-    setStagingTasks(stagingTasks);
-    setActiveTasks(activeTasks);
-    setLostTasks(lostTasks);
-    setKillableTasks(killableTasks);
-  }
-
-  /**
-   * Accept list of ByteBuffers and re-create the SchedulerState maps.
-   *
-   * @param framwrorkId
-   * @param taskIds
-   * @param taskNodes
-   * @param pendingTasks
-   * @param stagingTasks
-   * @param activeTasks
-   * @param lostTasks
-   * @param killableTasks
-   */
-  public StoreContext(ByteBuffer frameworkId, List<ByteBuffer> taskIds, 
List<ByteBuffer> taskNodes, List<ByteBuffer> pendingTasks, List<ByteBuffer> 
stagingTasks, List<ByteBuffer> activeTasks, List<ByteBuffer> lostTasks, 
List<ByteBuffer>
-      killableTasks) {
-    this.frameworkId = frameworkId;
-    this.taskIds = taskIds;
-    this.taskNodes = taskNodes;
-    this.pendingTasks = pendingTasks;
-    this.stagingTasks = stagingTasks;
-    this.activeTasks = activeTasks;
-    this.lostTasks = lostTasks;
-    this.killableTasks = killableTasks;
-  }
-
-  /**
-   * Use this to gather bytes to push to the state store
-   *
-   * @return byte stream of the state store context.
-   * @throws IOException
-   */
-  public ByteArrayOutputStream toSerializedContext() throws IOException {
-    ByteArrayOutputStream bytes = new ByteArrayOutputStream();
-    ByteBufferSupport.addByteBuffer(frameworkId, bytes);
-    ByteBufferSupport.addByteBuffers(taskIds, bytes);
-    ByteBufferSupport.addByteBuffers(taskNodes, bytes);
-    ByteBufferSupport.addByteBuffers(pendingTasks, bytes);
-    ByteBufferSupport.addByteBuffers(stagingTasks, bytes);
-    ByteBufferSupport.addByteBuffers(activeTasks, bytes);
-    ByteBufferSupport.addByteBuffers(lostTasks, bytes);
-    ByteBufferSupport.addByteBuffers(killableTasks, bytes);
-    return bytes;
-  }
-
-  /**
-   * When the bytes come back from the store, use this method to create a new 
context.
-   *
-   * @param bytes from state store
-   * @return initialized StoreContext to use to initialize a SchedulerState
-   */
-  @SuppressWarnings("unchecked")
-  public static StoreContext fromSerializedBytes(byte bytes[]) {
-    StoreContext ctx;
-    if (bytes != null && bytes.length > 0) {
-      ByteBuffer bb = ByteBufferSupport.fillBuffer(bytes);
-      ByteBuffer frameworkId = ByteBufferSupport.createBuffer(bb);
-      List<ByteBuffer> taskIds = ByteBufferSupport.createBufferList(bb, 
bb.getInt());
-      List<ByteBuffer> taskNodes = ByteBufferSupport.createBufferList(bb, 
bb.getInt());
-      List<ByteBuffer> pendingTasks = ByteBufferSupport.createBufferList(bb, 
bb.getInt());
-      List<ByteBuffer> stagingTasks = ByteBufferSupport.createBufferList(bb, 
bb.getInt());
-      List<ByteBuffer> activeTasks = ByteBufferSupport.createBufferList(bb, 
bb.getInt());
-      List<ByteBuffer> lostTasks = ByteBufferSupport.createBufferList(bb, 
bb.getInt());
-      List<ByteBuffer> killableTasks = ByteBufferSupport.createBufferList(bb, 
bb.getInt());
-      ctx = new StoreContext(frameworkId, taskIds, taskNodes, pendingTasks, 
stagingTasks, activeTasks, lostTasks, killableTasks);
-    } else {
-      ctx = new StoreContext();
-    }
-    return ctx;
-  }
-
-  /**
-   * Serialize tasks into internal ByteBuffers, removing the map.
-   *
-   * @param tasks
-   */
-  public void setTasks(Map<Protos.TaskID, NodeTask> tasks) {
-    taskIds = new ArrayList<ByteBuffer>(tasks.size());
-    taskNodes = new ArrayList<ByteBuffer>(tasks.size());
-    for (Entry<TaskID, NodeTask> entry : tasks.entrySet()) {
-      taskIds.add(ByteBufferSupport.toByteBuffer(entry.getKey()));
-      taskNodes.add(ByteBufferSupport.toByteBuffer(entry.getValue()));
-    }
-  }
-
-  /**
-   * De-serialize the internal ByteBuffer back into a Protos.FrameworkID.
-   *
-   * @return
-   */
-  public Protos.FrameworkID getFrameworkId() {
-    return ByteBufferSupport.toFrameworkID(frameworkId);
-  }
-
-  /**
-   * Serialize the Protos.FrameworkID into a ByteBuffer.
-   */
-  public void setFrameworkId(Protos.FrameworkID frameworkId) {
-    if (frameworkId != null) {
-      this.frameworkId = ByteBufferSupport.toByteBuffer(frameworkId);
-    }
-  }
-
-  /**
-   * De-serialize the internal ByteBuffers back into a Task map.
-   *
-   * @return
-   */
-  public Map<Protos.TaskID, NodeTask> getTasks() {
-    Map<Protos.TaskID, NodeTask> map = null;
-    if (taskIds != null) {
-      map = new HashMap<Protos.TaskID, NodeTask>(taskIds.size());
-      int idx = 0;
-      for (ByteBuffer bb : taskIds) {
-        final Protos.TaskID taskId = ByteBufferSupport.toTaskId(bb);
-        final NodeTask task = 
ByteBufferSupport.toNodeTask(taskNodes.get(idx++));
-        if (task.getTaskPrefix() == null && taskId != null) {
-          String taskPrefix = taskIdPattern.split(taskId.getValue())[0];
-          task.setTaskPrefix(taskPrefix);
-        }
-        map.put(taskId, task);
-      }
-    } else {
-      map = new HashMap<Protos.TaskID, NodeTask>(0);
-    }
-    return map;
-  }
-
-  public void setPendingTasks(Set<Protos.TaskID> tasks) {
-    if (tasks != null) {
-      pendingTasks = new ArrayList<ByteBuffer>(tasks.size());
-      toTaskBuffer(tasks, pendingTasks);
-    }
-  }
-
-  public Set<Protos.TaskID> getPendingTasks() {
-    return toTaskSet(pendingTasks);
-  }
-
-  public void setStagingTasks(Set<Protos.TaskID> tasks) {
-    if (tasks != null) {
-      stagingTasks = new ArrayList<ByteBuffer>(tasks.size());
-      toTaskBuffer(tasks, stagingTasks);
-    }
-  }
-
-  public Set<Protos.TaskID> getStagingTasks() {
-    return toTaskSet(stagingTasks);
-  }
-
-  public void setActiveTasks(Set<Protos.TaskID> tasks) {
-    if (tasks != null) {
-      activeTasks = new ArrayList<ByteBuffer>(tasks.size());
-      toTaskBuffer(tasks, activeTasks);
-    }
-  }
-
-  public Set<Protos.TaskID> getActiveTasks() {
-    return toTaskSet(activeTasks);
-  }
-
-  public void setLostTasks(Set<Protos.TaskID> tasks) {
-    if (tasks != null) {
-      lostTasks = new ArrayList<ByteBuffer>(tasks.size());
-      toTaskBuffer(tasks, lostTasks);
-    }
-  }
-
-  public Set<Protos.TaskID> getLostTasks() {
-    return toTaskSet(lostTasks);
-  }
-
-  public void setKillableTasks(Set<Protos.TaskID> tasks) {
-    if (tasks != null) {
-      killableTasks = new ArrayList<ByteBuffer>(tasks.size());
-      toTaskBuffer(tasks, killableTasks);
-    }
-  }
-
-  public Set<Protos.TaskID> getKillableTasks() {
-    return toTaskSet(killableTasks);
-  }
-
-  private void toTaskBuffer(Set<Protos.TaskID> src, List<ByteBuffer> tgt) {
-    for (Protos.TaskID id : src) {
-      tgt.add(ByteBufferSupport.toByteBuffer(id));
-    }
-  }
-
-  private Set<Protos.TaskID> toTaskSet(List<ByteBuffer> src) {
-    Set<Protos.TaskID> tasks;
-    if (src != null) {
-      tasks = new HashSet<Protos.TaskID>(src.size());
-      for (int i = 0; i < src.size(); i++) {
-        tasks.add(ByteBufferSupport.toTaskId(src.get(i)));
-      }
-    } else {
-      tasks = new HashSet<Protos.TaskID>(0);
-    }
-    return tasks;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/101bcad3/myriad-scheduler/src/main/java/com/ebay/myriad/webapp/HttpConnectorProvider.java
----------------------------------------------------------------------
diff --git 
a/myriad-scheduler/src/main/java/com/ebay/myriad/webapp/HttpConnectorProvider.java
 
b/myriad-scheduler/src/main/java/com/ebay/myriad/webapp/HttpConnectorProvider.java
deleted file mode 100644
index a52b310..0000000
--- 
a/myriad-scheduler/src/main/java/com/ebay/myriad/webapp/HttpConnectorProvider.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.ebay.myriad.webapp;
-
-import com.ebay.myriad.configuration.MyriadConfiguration;
-import com.google.inject.Provider;
-import org.mortbay.jetty.Connector;
-import org.mortbay.jetty.nio.SelectChannelConnector;
-
-import javax.inject.Inject;
-
-/**
- * The factory for creating the http connector for the myriad scheduler
- */
-public class HttpConnectorProvider implements Provider<Connector> {
-
-  private MyriadConfiguration myriadConf;
-
-  @Inject
-  public HttpConnectorProvider(MyriadConfiguration myriadConf) {
-    this.myriadConf = myriadConf;
-  }
-
-  @Override
-  public Connector get() {
-    SelectChannelConnector ret = new SelectChannelConnector();
-    ret.setName("Myriad");
-    ret.setHost("0.0.0.0");
-    ret.setPort(myriadConf.getRestApiPort());
-
-    return ret;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/101bcad3/myriad-scheduler/src/main/java/com/ebay/myriad/webapp/MyriadServletModule.java
----------------------------------------------------------------------
diff --git 
a/myriad-scheduler/src/main/java/com/ebay/myriad/webapp/MyriadServletModule.java
 
b/myriad-scheduler/src/main/java/com/ebay/myriad/webapp/MyriadServletModule.java
deleted file mode 100644
index cea22d1..0000000
--- 
a/myriad-scheduler/src/main/java/com/ebay/myriad/webapp/MyriadServletModule.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.ebay.myriad.webapp;
-
-import com.ebay.myriad.api.ClustersResource;
-import com.ebay.myriad.api.ConfigurationResource;
-import com.ebay.myriad.api.SchedulerStateResource;
-import com.google.inject.Scopes;
-import com.google.inject.servlet.ServletModule;
-import com.sun.jersey.guice.spi.container.servlet.GuiceContainer;
-import org.codehaus.jackson.jaxrs.JacksonJaxbJsonProvider;
-
-/**
- * The guice module for configuring the myriad dashboard
- */
-public class MyriadServletModule extends ServletModule {
-
-  @Override
-  protected void configureServlets() {
-    bind(ClustersResource.class);
-    bind(ConfigurationResource.class);
-    bind(SchedulerStateResource.class);
-
-    bind(GuiceContainer.class);
-    bind(JacksonJaxbJsonProvider.class).in(Scopes.SINGLETON);
-
-    serve("/api/*").with(GuiceContainer.class);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/101bcad3/myriad-scheduler/src/main/java/com/ebay/myriad/webapp/MyriadWebServer.java
----------------------------------------------------------------------
diff --git 
a/myriad-scheduler/src/main/java/com/ebay/myriad/webapp/MyriadWebServer.java 
b/myriad-scheduler/src/main/java/com/ebay/myriad/webapp/MyriadWebServer.java
deleted file mode 100644
index 2b54d4c..0000000
--- a/myriad-scheduler/src/main/java/com/ebay/myriad/webapp/MyriadWebServer.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.ebay.myriad.webapp;
-
-import com.google.inject.servlet.GuiceFilter;
-import org.mortbay.jetty.Connector;
-import org.mortbay.jetty.Handler;
-import org.mortbay.jetty.Server;
-import org.mortbay.jetty.servlet.*;
-
-import javax.inject.Inject;
-
-/**
- * The myriad web server configuration for jetty
- */
-public class MyriadWebServer {
-  private final Server jetty;
-  private final Connector connector;
-  private final GuiceFilter filter;
-
-  @Inject
-  public MyriadWebServer(Server jetty, Connector connector, GuiceFilter 
filter) {
-    this.jetty = jetty;
-    this.connector = connector;
-    this.filter = filter;
-  }
-
-  public void start() throws Exception {
-    this.jetty.addConnector(connector);
-
-    ServletHandler servletHandler = new ServletHandler();
-
-    String filterName = "MyriadGuiceFilter";
-    FilterHolder holder = new FilterHolder(filter);
-    holder.setName(filterName);
-
-    FilterMapping filterMapping = new FilterMapping();
-    filterMapping.setPathSpec("/*");
-    filterMapping.setDispatches(Handler.ALL);
-    filterMapping.setFilterName(filterName);
-
-    servletHandler.addFilter(holder, filterMapping);
-
-    Context context = new Context();
-    context.setServletHandler(servletHandler);
-    context.addServlet(DefaultServlet.class, "/");
-
-    String staticDir = 
this.getClass().getClassLoader().getResource("webapp/public").toExternalForm();
-    context.setResourceBase(staticDir);
-
-    this.jetty.addHandler(context);
-    this.jetty.start();
-  }
-
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/101bcad3/myriad-scheduler/src/main/java/com/ebay/myriad/webapp/WebAppGuiceModule.java
----------------------------------------------------------------------
diff --git 
a/myriad-scheduler/src/main/java/com/ebay/myriad/webapp/WebAppGuiceModule.java 
b/myriad-scheduler/src/main/java/com/ebay/myriad/webapp/WebAppGuiceModule.java
deleted file mode 100644
index 861dc00..0000000
--- 
a/myriad-scheduler/src/main/java/com/ebay/myriad/webapp/WebAppGuiceModule.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.ebay.myriad.webapp;
-
-import com.google.inject.AbstractModule;
-import org.mortbay.jetty.Connector;
-
-/**
- * The guice web application configuration
- */
-public class WebAppGuiceModule extends AbstractModule {
-
-  @Override
-  protected void configure() {
-    bind(Connector.class).toProvider(HttpConnectorProvider.class);
-    install(new MyriadServletModule());
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/101bcad3/myriad-scheduler/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MyriadFileSystemRMStateStore.java
----------------------------------------------------------------------
diff --git 
a/myriad-scheduler/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MyriadFileSystemRMStateStore.java
 
b/myriad-scheduler/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MyriadFileSystemRMStateStore.java
index 227229c..4991df2 100644
--- 
a/myriad-scheduler/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MyriadFileSystemRMStateStore.java
+++ 
b/myriad-scheduler/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MyriadFileSystemRMStateStore.java
@@ -28,8 +28,8 @@ import org.apache.hadoop.fs.Path;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.ebay.myriad.state.MyriadStateStore;
-import com.ebay.myriad.state.utils.StoreContext;
+import org.apache.myriad.state.MyriadStateStore;
+import org.apache.myriad.state.utils.StoreContext;
 
 /**
  * StateStore that stores Myriad state in addition to RM state to DFS.

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/101bcad3/myriad-scheduler/src/main/java/org/apache/myriad/DisruptorManager.java
----------------------------------------------------------------------
diff --git 
a/myriad-scheduler/src/main/java/org/apache/myriad/DisruptorManager.java 
b/myriad-scheduler/src/main/java/org/apache/myriad/DisruptorManager.java
new file mode 100644
index 0000000..698f615
--- /dev/null
+++ b/myriad-scheduler/src/main/java/org/apache/myriad/DisruptorManager.java
@@ -0,0 +1,147 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.myriad;
+
+import org.apache.myriad.scheduler.event.handlers.DisconnectedEventHandler;
+import org.apache.myriad.scheduler.event.handlers.ErrorEventHandler;
+import org.apache.myriad.scheduler.event.handlers.ExecutorLostEventHandler;
+import org.apache.myriad.scheduler.event.handlers.FrameworkMessageEventHandler;
+import org.apache.myriad.scheduler.event.handlers.OfferRescindedEventHandler;
+import org.apache.myriad.scheduler.event.handlers.ReRegisteredEventHandler;
+import org.apache.myriad.scheduler.event.handlers.RegisteredEventHandler;
+import org.apache.myriad.scheduler.event.handlers.ResourceOffersEventHandler;
+import org.apache.myriad.scheduler.event.handlers.SlaveLostEventHandler;
+import org.apache.myriad.scheduler.event.handlers.StatusUpdateEventHandler;
+import com.google.inject.Injector;
+import com.lmax.disruptor.dsl.Disruptor;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+/**
+ * Disruptor class is an event bus used in high speed financial systems. 
http://martinfowler.com/articles/lmax.html
+ * Here it is used to abstract incoming events.
+ */
+public class DisruptorManager {
+  private ExecutorService disruptorExecutors;
+
+  private static final int DEFAULT_SMALL_RINGBUFFER_SIZE = 64;
+  private static final int DEFAULT_LARGE_RINGBUFFER_SIZE = 1024;
+
+  private Disruptor<org.apache.myriad.scheduler.event.RegisteredEvent> 
registeredEventDisruptor;
+  private Disruptor<org.apache.myriad.scheduler.event.ReRegisteredEvent> 
reRegisteredEventDisruptor;
+  private Disruptor<org.apache.myriad.scheduler.event.ResourceOffersEvent> 
resourceOffersEventDisruptor;
+  private Disruptor<org.apache.myriad.scheduler.event.OfferRescindedEvent> 
offerRescindedEventDisruptor;
+  private Disruptor<org.apache.myriad.scheduler.event.StatusUpdateEvent> 
statusUpdateEventDisruptor;
+  private Disruptor<org.apache.myriad.scheduler.event.FrameworkMessageEvent> 
frameworkMessageEventDisruptor;
+  private Disruptor<org.apache.myriad.scheduler.event.DisconnectedEvent> 
disconnectedEventDisruptor;
+  private Disruptor<org.apache.myriad.scheduler.event.SlaveLostEvent> 
slaveLostEventDisruptor;
+  private Disruptor<org.apache.myriad.scheduler.event.ExecutorLostEvent> 
executorLostEventDisruptor;
+  private Disruptor<org.apache.myriad.scheduler.event.ErrorEvent> 
errorEventDisruptor;
+
+  @SuppressWarnings("unchecked")
+  public void init(Injector injector) {
+    this.disruptorExecutors = Executors.newCachedThreadPool();
+
+    // todo:  (kensipe) need to make ringsize configurable (overriding the 
defaults)
+
+
+    this.registeredEventDisruptor = new Disruptor<>(new 
org.apache.myriad.scheduler.event.RegisteredEventFactory(), 
DEFAULT_SMALL_RINGBUFFER_SIZE, disruptorExecutors);
+    
this.registeredEventDisruptor.handleEventsWith(injector.getInstance(RegisteredEventHandler.class));
+    this.registeredEventDisruptor.start();
+
+    this.reRegisteredEventDisruptor = new Disruptor<>(new 
org.apache.myriad.scheduler.event.ReRegisteredEventFactory(), 
DEFAULT_SMALL_RINGBUFFER_SIZE, disruptorExecutors);
+    
this.reRegisteredEventDisruptor.handleEventsWith(injector.getInstance(ReRegisteredEventHandler.class));
+    this.reRegisteredEventDisruptor.start();
+
+
+    this.resourceOffersEventDisruptor = new Disruptor<>(new 
org.apache.myriad.scheduler.event.ResourceOffersEventFactory(), 
DEFAULT_LARGE_RINGBUFFER_SIZE, disruptorExecutors);
+    
this.resourceOffersEventDisruptor.handleEventsWith(injector.getInstance(ResourceOffersEventHandler.class));
+    this.resourceOffersEventDisruptor.start();
+
+    this.offerRescindedEventDisruptor = new Disruptor<>(new 
org.apache.myriad.scheduler.event.OfferRescindedEventFactory(), 
DEFAULT_LARGE_RINGBUFFER_SIZE, disruptorExecutors);
+    
this.offerRescindedEventDisruptor.handleEventsWith(injector.getInstance(OfferRescindedEventHandler.class));
+    this.offerRescindedEventDisruptor.start();
+
+    this.statusUpdateEventDisruptor = new Disruptor<>(new 
org.apache.myriad.scheduler.event.StatusUpdateEventFactory(), 
DEFAULT_LARGE_RINGBUFFER_SIZE, disruptorExecutors);
+    
this.statusUpdateEventDisruptor.handleEventsWith(injector.getInstance(StatusUpdateEventHandler.class));
+    this.statusUpdateEventDisruptor.start();
+
+    this.frameworkMessageEventDisruptor = new Disruptor<>(new 
org.apache.myriad.scheduler.event.FrameworkMessageEventFactory(), 
DEFAULT_LARGE_RINGBUFFER_SIZE, disruptorExecutors);
+    
this.frameworkMessageEventDisruptor.handleEventsWith(injector.getInstance(FrameworkMessageEventHandler.class));
+    this.frameworkMessageEventDisruptor.start();
+
+    this.disconnectedEventDisruptor = new Disruptor<>(new 
org.apache.myriad.scheduler.event.DisconnectedEventFactory(), 
DEFAULT_LARGE_RINGBUFFER_SIZE, disruptorExecutors);
+    
this.disconnectedEventDisruptor.handleEventsWith(injector.getInstance(DisconnectedEventHandler.class));
+    this.disconnectedEventDisruptor.start();
+
+    this.slaveLostEventDisruptor = new Disruptor<>(new 
org.apache.myriad.scheduler.event.SlaveLostEventFactory(), 
DEFAULT_LARGE_RINGBUFFER_SIZE, disruptorExecutors);
+    
this.slaveLostEventDisruptor.handleEventsWith(injector.getInstance(SlaveLostEventHandler.class));
+    this.slaveLostEventDisruptor.start();
+
+    this.executorLostEventDisruptor = new Disruptor<>(new 
org.apache.myriad.scheduler.event.ExecutorLostEventFactory(), 
DEFAULT_LARGE_RINGBUFFER_SIZE, disruptorExecutors);
+    
this.executorLostEventDisruptor.handleEventsWith(injector.getInstance(ExecutorLostEventHandler.class));
+    this.executorLostEventDisruptor.start();
+
+    this.errorEventDisruptor = new Disruptor<>(new 
org.apache.myriad.scheduler.event.ErrorEventFactory(), 
DEFAULT_LARGE_RINGBUFFER_SIZE, disruptorExecutors);
+    
this.errorEventDisruptor.handleEventsWith(injector.getInstance(ErrorEventHandler.class));
+    this.errorEventDisruptor.start();
+  }
+
+  public Disruptor<org.apache.myriad.scheduler.event.RegisteredEvent> 
getRegisteredEventDisruptor() {
+    return registeredEventDisruptor;
+  }
+
+  public Disruptor<org.apache.myriad.scheduler.event.ReRegisteredEvent> 
getReRegisteredEventDisruptor() {
+    return reRegisteredEventDisruptor;
+  }
+
+  public Disruptor<org.apache.myriad.scheduler.event.ResourceOffersEvent> 
getResourceOffersEventDisruptor() {
+    return resourceOffersEventDisruptor;
+  }
+
+  public Disruptor<org.apache.myriad.scheduler.event.OfferRescindedEvent> 
getOfferRescindedEventDisruptor() {
+    return offerRescindedEventDisruptor;
+  }
+
+  public Disruptor<org.apache.myriad.scheduler.event.StatusUpdateEvent> 
getStatusUpdateEventDisruptor() {
+    return statusUpdateEventDisruptor;
+  }
+
+  public Disruptor<org.apache.myriad.scheduler.event.FrameworkMessageEvent> 
getFrameworkMessageEventDisruptor() {
+    return frameworkMessageEventDisruptor;
+  }
+
+  public Disruptor<org.apache.myriad.scheduler.event.DisconnectedEvent> 
getDisconnectedEventDisruptor() {
+    return disconnectedEventDisruptor;
+  }
+
+  public Disruptor<org.apache.myriad.scheduler.event.SlaveLostEvent> 
getSlaveLostEventDisruptor() {
+    return slaveLostEventDisruptor;
+  }
+
+  public Disruptor<org.apache.myriad.scheduler.event.ExecutorLostEvent> 
getExecutorLostEventDisruptor() {
+    return executorLostEventDisruptor;
+  }
+
+  public Disruptor<org.apache.myriad.scheduler.event.ErrorEvent> 
getErrorEventDisruptor() {
+    return errorEventDisruptor;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/101bcad3/myriad-scheduler/src/main/java/org/apache/myriad/Main.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/Main.java 
b/myriad-scheduler/src/main/java/org/apache/myriad/Main.java
new file mode 100644
index 0000000..0f305e8
--- /dev/null
+++ b/myriad-scheduler/src/main/java/org/apache/myriad/Main.java
@@ -0,0 +1,286 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.myriad;
+
+import com.codahale.metrics.JmxReporter;
+import com.codahale.metrics.MetricRegistry;
+import com.codahale.metrics.health.HealthCheckRegistry;
+import org.apache.myriad.configuration.ServiceConfiguration;
+import org.apache.myriad.configuration.MyriadBadConfigurationException;
+import org.apache.myriad.configuration.MyriadConfiguration;
+import org.apache.myriad.health.MesosDriverHealthCheck;
+import org.apache.myriad.health.MesosMasterHealthCheck;
+import org.apache.myriad.health.ZookeeperHealthCheck;
+import org.apache.myriad.scheduler.NMProfile;
+import org.apache.myriad.scheduler.ServiceProfileManager;
+import org.apache.myriad.scheduler.ServiceResourceProfile;
+import org.apache.myriad.scheduler.TaskFactory;
+import org.apache.myriad.webapp.WebAppGuiceModule;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
+import com.google.inject.Guice;
+import com.google.inject.Injector;
+
+import org.apache.commons.collections.MapUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.HashSet;
+
+/**
+ * Main entry point for myriad scheduler
+ */
+public class Main {
+  private static final Logger LOGGER = LoggerFactory.getLogger(Main.class);
+
+  private org.apache.myriad.webapp.MyriadWebServer webServer;
+  private ScheduledExecutorService terminatorService;
+
+  private ScheduledExecutorService rebalancerService;
+  private HealthCheckRegistry healthCheckRegistry;
+
+  private static Injector injector;
+
+  public static void initialize(Configuration hadoopConf, 
AbstractYarnScheduler yarnScheduler, RMContext rmContext, 
org.apache.myriad.scheduler.yarn.interceptor.InterceptorRegistry registry) 
throws Exception {
+    ObjectMapper mapper = new ObjectMapper(new YAMLFactory());
+    MyriadConfiguration cfg = 
mapper.readValue(Thread.currentThread().getContextClassLoader().getResource("myriad-config-default.yml"),
 MyriadConfiguration.class);
+
+    MyriadModule myriadModule = new MyriadModule(cfg, hadoopConf, 
yarnScheduler, rmContext, registry);
+    MesosModule mesosModule = new MesosModule();
+    injector = Guice.createInjector(myriadModule, mesosModule, new 
WebAppGuiceModule());
+
+    new Main().run(cfg);
+  }
+
+  // TODO (Kannan Rajah) Hack to get injector in unit test.
+  public static Injector getInjector() {
+    return injector;
+  }
+
+  public void run(MyriadConfiguration cfg) throws Exception {
+    if (LOGGER.isDebugEnabled()) {
+      LOGGER.debug("Bindings: " + injector.getAllBindings());
+    }
+
+    JmxReporter.forRegistry(new MetricRegistry()).build().start();
+
+    initWebApp(injector);
+    initHealthChecks(injector);
+    initProfiles(injector);
+    validateNMInstances(injector);
+    initServiceConfigurations(cfg, injector);
+    initDisruptors(injector);
+
+    initRebalancerService(cfg, injector);
+    initTerminatorService(injector);
+    startMesosDriver(injector);
+    startNMInstances(injector);
+    startJavaBasedTaskInstance(injector);
+  }
+
+
+  private void startMesosDriver(Injector injector) {
+    LOGGER.info("starting mesosDriver..");
+    
injector.getInstance(org.apache.myriad.scheduler.MyriadDriverManager.class).startDriver();
+    LOGGER.info("started mesosDriver..");
+  }
+
+  /**
+   * Brings up the embedded jetty webserver for serving REST APIs.
+   *
+   * @param injector
+   */
+  private void initWebApp(Injector injector) throws Exception {
+    webServer = 
injector.getInstance(org.apache.myriad.webapp.MyriadWebServer.class);
+    webServer.start();
+  }
+
+  /**
+   * Initializes health checks.
+   *
+   * @param injector
+   */
+  private void initHealthChecks(Injector injector) {
+    LOGGER.info("Initializing HealthChecks");
+    healthCheckRegistry = new HealthCheckRegistry();
+    healthCheckRegistry.register(MesosMasterHealthCheck.NAME, 
injector.getInstance(MesosMasterHealthCheck.class));
+    healthCheckRegistry.register(ZookeeperHealthCheck.NAME, 
injector.getInstance(ZookeeperHealthCheck.class));
+    healthCheckRegistry.register(MesosDriverHealthCheck.NAME, 
injector.getInstance(MesosDriverHealthCheck.class));
+  }
+
+  private void initProfiles(Injector injector) {
+    LOGGER.info("Initializing Profiles");
+    ServiceProfileManager profileManager = 
injector.getInstance(ServiceProfileManager.class);
+    org.apache.myriad.scheduler.TaskConstraintsManager taskConstraintsManager 
= 
injector.getInstance(org.apache.myriad.scheduler.TaskConstraintsManager.class);
+    
taskConstraintsManager.addTaskConstraints(org.apache.myriad.configuration.NodeManagerConfiguration.NM_TASK_PREFIX,
 new TaskFactory.NMTaskConstraints());
+    Map<String, Map<String, String>> profiles = 
injector.getInstance(MyriadConfiguration.class).getProfiles();
+    org.apache.myriad.scheduler.TaskUtils taskUtils = 
injector.getInstance(org.apache.myriad.scheduler.TaskUtils.class);
+    if (MapUtils.isNotEmpty(profiles)) {
+      for (Map.Entry<String, Map<String, String>> profile : 
profiles.entrySet()) {
+        Map<String, String> profileResourceMap = profile.getValue();
+        if (MapUtils.isNotEmpty(profiles) && 
profileResourceMap.containsKey("cpu") && profileResourceMap.containsKey("mem")) 
{
+          Long cpu = Long.parseLong(profileResourceMap.get("cpu"));
+          Long mem = Long.parseLong(profileResourceMap.get("mem"));
+
+          ServiceResourceProfile serviceProfile = new 
org.apache.myriad.scheduler.ExtendedResourceProfile(new 
NMProfile(profile.getKey(), cpu, mem), taskUtils.getNodeManagerCpus(), 
taskUtils.getNodeManagerMemory());
+          serviceProfile.setExecutorCpu(taskUtils.getExecutorCpus());
+          serviceProfile.setExecutorMemory(taskUtils.getExecutorMemory());
+
+          profileManager.add(serviceProfile);
+        } else {
+          LOGGER.error("Invalid definition for profile: " + profile.getKey());
+        }
+      }
+    }
+  }
+
+  private void validateNMInstances(Injector injector) {
+    LOGGER.info("Validating nmInstances..");
+    Map<String, Integer> nmInstances = 
injector.getInstance(MyriadConfiguration.class).getNmInstances();
+    ServiceProfileManager profileManager = 
injector.getInstance(ServiceProfileManager.class);
+
+    long maxCpu = Long.MIN_VALUE;
+    long maxMem = Long.MIN_VALUE;
+    for (Map.Entry<String, Integer> entry : nmInstances.entrySet()) {
+      String profile = entry.getKey();
+      ServiceResourceProfile nodeManager = profileManager.get(profile);
+      if (nodeManager == null) {
+        throw new RuntimeException("Invalid profile name '" + profile + "' 
specified in 'nmInstances'");
+      }
+      if (entry.getValue() > 0) {
+        if (nodeManager.getCpus() > maxCpu) { // find the profile with largest 
number of cpus
+          maxCpu = nodeManager.getCpus().longValue();
+          maxMem = nodeManager.getMemory().longValue(); // use the memory from 
the same profile
+        }
+      }
+    }
+    if (maxCpu <= 0 || maxMem <= 0) {
+      throw new RuntimeException("Please configure 'nmInstances' with at least 
one instance/profile " + "with non-zero cpu/mem resources.");
+    }
+  }
+
+  private void startNMInstances(Injector injector) {
+    Map<String, Integer> nmInstances = 
injector.getInstance(MyriadConfiguration.class).getNmInstances();
+    org.apache.myriad.scheduler.MyriadOperations myriadOperations = 
injector.getInstance(org.apache.myriad.scheduler.MyriadOperations.class);
+    ServiceProfileManager profileManager = 
injector.getInstance(ServiceProfileManager.class);
+    org.apache.myriad.state.SchedulerState schedulerState = 
injector.getInstance(org.apache.myriad.state.SchedulerState.class);
+
+    Set<org.apache.myriad.state.NodeTask> launchedNMTasks = new HashSet<>();
+    
launchedNMTasks.addAll(schedulerState.getPendingTasksByType(org.apache.myriad.configuration.NodeManagerConfiguration.NM_TASK_PREFIX));
+    if (!launchedNMTasks.isEmpty()) {
+      LOGGER.info("{} NM(s) in pending state. Not launching additional NMs", 
launchedNMTasks.size());
+      return;
+    }
+
+    
launchedNMTasks.addAll(schedulerState.getStagingTasksByType(org.apache.myriad.configuration.NodeManagerConfiguration.NM_TASK_PREFIX));
+    if (!launchedNMTasks.isEmpty()) {
+      LOGGER.info("{} NM(s) in staging state. Not launching additional NMs", 
launchedNMTasks.size());
+      return;
+    }
+
+    
launchedNMTasks.addAll(schedulerState.getActiveTasksByType(org.apache.myriad.configuration.NodeManagerConfiguration.NM_TASK_PREFIX));
+    if (!launchedNMTasks.isEmpty()) {
+      LOGGER.info("{} NM(s) in active state. Not launching additional NMs", 
launchedNMTasks.size());
+      return;
+    }
+
+    for (Map.Entry<String, Integer> entry : nmInstances.entrySet()) {
+      LOGGER.info("Launching {} NM(s) with profile {}", entry.getValue(), 
entry.getKey());
+      myriadOperations.flexUpCluster(profileManager.get(entry.getKey()), 
entry.getValue(), null);
+    }
+  }
+
+  /**
+   * Create ServiceProfile for any configured service
+   *
+   * @param cfg
+   * @param injector
+   */
+  private void initServiceConfigurations(MyriadConfiguration cfg, Injector 
injector) {
+    LOGGER.info("Initializing initServiceConfigurations");
+    ServiceProfileManager profileManager = 
injector.getInstance(ServiceProfileManager.class);
+    org.apache.myriad.scheduler.TaskConstraintsManager taskConstraintsManager 
= 
injector.getInstance(org.apache.myriad.scheduler.TaskConstraintsManager.class);
+
+    Map<String, ServiceConfiguration> servicesConfigs = 
injector.getInstance(MyriadConfiguration.class).getServiceConfigurations();
+    if (servicesConfigs != null) {
+      for (Map.Entry<String, ServiceConfiguration> entry : 
servicesConfigs.entrySet()) {
+        final String taskPrefix = entry.getKey();
+        ServiceConfiguration config = entry.getValue();
+        final Double cpu = 
config.getCpus().or(ServiceConfiguration.DEFAULT_CPU);
+        final Double mem = 
config.getJvmMaxMemoryMB().or(ServiceConfiguration.DEFAULT_MEMORY);
+
+        profileManager.add(new ServiceResourceProfile(taskPrefix, cpu, mem));
+        taskConstraintsManager.addTaskConstraints(taskPrefix, new 
org.apache.myriad.scheduler.ServiceTaskConstraints(cfg, taskPrefix));
+      }
+    }
+  }
+
+  private void initTerminatorService(Injector injector) {
+    LOGGER.info("Initializing Terminator");
+    terminatorService = Executors.newScheduledThreadPool(1);
+    final int initialDelay = 100;
+    final int period = 2000;
+    
terminatorService.scheduleAtFixedRate(injector.getInstance(org.apache.myriad.scheduler.TaskTerminator.class),
 initialDelay, period, TimeUnit.MILLISECONDS);
+  }
+
+  private void initRebalancerService(MyriadConfiguration cfg, Injector 
injector) {
+    if (cfg.isRebalancer()) {
+      LOGGER.info("Initializing Rebalancer");
+      rebalancerService = Executors.newScheduledThreadPool(1);
+      final int initialDelay = 100;
+      final int period = 5000;
+      
rebalancerService.scheduleAtFixedRate(injector.getInstance(org.apache.myriad.scheduler.Rebalancer.class),
 initialDelay, period, TimeUnit.MILLISECONDS);
+    } else {
+      LOGGER.info("Rebalancer is not turned on");
+    }
+  }
+
+  private void initDisruptors(Injector injector) {
+    LOGGER.info("Initializing Disruptors");
+    DisruptorManager disruptorManager = 
injector.getInstance(DisruptorManager.class);
+    disruptorManager.init(injector);
+  }
+
+  /**
+   * Start tasks for configured services
+   *
+   * @param injector
+   */
+  private void startJavaBasedTaskInstance(Injector injector) {
+    Map<String, ServiceConfiguration> auxServicesConfigs = 
injector.getInstance(MyriadConfiguration.class).getServiceConfigurations();
+    if (auxServicesConfigs != null) {
+      org.apache.myriad.scheduler.MyriadOperations myriadOperations = 
injector.getInstance(org.apache.myriad.scheduler.MyriadOperations.class);
+      for (Map.Entry<String, ServiceConfiguration> entry : 
auxServicesConfigs.entrySet()) {
+        try {
+          
myriadOperations.flexUpAService(entry.getValue().getMaxInstances().or(1), 
entry.getKey());
+        } catch (MyriadBadConfigurationException e) {
+          LOGGER.warn("Exception while trying to flexup service: {}", 
entry.getKey(), e);
+        }
+      }
+    }
+  }
+}

Reply via email to