Additional changes for getting Myriad HA to work

* Myriad Executor + NM (merged) now sends TASK_RUNNING and TASK_FINISHED 
messages to mesos for Mesos tasks
  corresponding to yarn containers. This is independent of the RM.
* Entire ExecutorInfo object for NM tasks is being preserved and recovered from 
the state store.
  This is being done because mesos requires all tasks run on the same executor 
to have the same executor info
  objects. The Myriad Executor + NM (merged) also runs tasks corresponding to 
yarn containers. These tasks also
  need to be provided the same ExecutorInfo object. This ExecutorInfo object 
cannot be obtained across an RM restart
  without being preserved into the state store. Made code changes to store 
ExecutorInfo into the scheduler state and
  serialize and deserialize it to the state store.
* Made sure that the RM's view of NM capacity is updated correctly after an RM 
restart. RM's view is not regenerated
  atomically, so assumptions about data being available are not always true. 
Fixed a few NullPointerExceptions here.

Testing done

* Run a job with one node using Course Grain Scaling(CGS) and one flexed up 
node using Fine Grained Scaling(FGS).
  On completion of the job kill RM. RM launches on another node. Delete output 
directory of first job and execute
  same job again.
  THis tests
  1. That the RM successfully recovers the list of NM Tasks that it has 
launched before restart.
  2. The executorInfo is stored and retrieved from the state store.

* Run a long running job. Kill the RM while the job is running. RM launches on 
another node and the job continues
  progress.
  1. That the RM successfully recovers the list of NM Tasks that it has 
launched before restart.
  2. The executorInfo is stored and retrieved from the state store.
  3. RM recovers and job makes forward progress.


Project: http://git-wip-us.apache.org/repos/asf/incubator-myriad/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-myriad/commit/0fa49c26
Tree: http://git-wip-us.apache.org/repos/asf/incubator-myriad/tree/0fa49c26
Diff: http://git-wip-us.apache.org/repos/asf/incubator-myriad/diff/0fa49c26

Branch: refs/heads/phase1
Commit: 0fa49c26bc0492ef4b69a85c219eab53f1cc7f0a
Parents: 23e01ec
Author: Swapnil Daingade <sdaing...@maprtech.com>
Authored: Sat Aug 15 05:23:32 2015 -0700
Committer: Swapnil Daingade <sdaing...@maprtech.com>
Committed: Sat Aug 29 11:41:33 2015 -0700

----------------------------------------------------------------------
 .../executor/MyriadExecutorAuxService.java      | 28 ++++++++-
 .../handlers/ResourceOffersEventHandler.java    | 10 ++++
 .../scheduler/fgs/NMHeartBeatHandler.java       | 60 ++++----------------
 .../scheduler/fgs/YarnNodeCapacityManager.java  |  7 ++-
 .../java/com/ebay/myriad/state/NodeTask.java    | 13 +++++
 .../com/ebay/myriad/state/SchedulerState.java   | 14 ++++-
 .../myriad/state/utils/ByteBufferSupport.java   | 30 ++++++++++
 7 files changed, 109 insertions(+), 53 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/0fa49c26/myriad-executor/src/main/java/com/ebay/myriad/executor/MyriadExecutorAuxService.java
----------------------------------------------------------------------
diff --git 
a/myriad-executor/src/main/java/com/ebay/myriad/executor/MyriadExecutorAuxService.java
 
b/myriad-executor/src/main/java/com/ebay/myriad/executor/MyriadExecutorAuxService.java
index 2c7d87d..a6d126a 100644
--- 
a/myriad-executor/src/main/java/com/ebay/myriad/executor/MyriadExecutorAuxService.java
+++ 
b/myriad-executor/src/main/java/com/ebay/myriad/executor/MyriadExecutorAuxService.java
@@ -20,12 +20,17 @@ package com.ebay.myriad.executor;
 
 import java.nio.ByteBuffer;
 
+import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.server.api.ApplicationInitializationContext;
 import org.apache.hadoop.yarn.server.api.ApplicationTerminationContext;
 import org.apache.hadoop.yarn.server.api.AuxiliaryService;
+import org.apache.hadoop.yarn.server.api.ContainerTerminationContext;
 
 import org.apache.mesos.MesosExecutorDriver;
 import org.apache.mesos.Protos.Status;
+import org.apache.mesos.Protos.TaskState;
+import org.apache.mesos.Protos.TaskStatus;
+import org.apache.mesos.Protos;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -37,6 +42,9 @@ public class MyriadExecutorAuxService  extends 
AuxiliaryService {
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(MyriadExecutor.class);
   private static final String SERVICE_NAME = "myriad_service";
+  public static final String YARN_CONTAINER_TASK_ID_PREFIX = "yarn_";
+
+  private MesosExecutorDriver driver;
 
   protected MyriadExecutorAuxService() {
     super(SERVICE_NAME);
@@ -48,7 +56,7 @@ public class MyriadExecutorAuxService  extends 
AuxiliaryService {
 
     new Thread(new Runnable() {
       public void run() {
-        MesosExecutorDriver driver = new MesosExecutorDriver(new 
MyriadExecutor());
+        driver = new MesosExecutorDriver(new MyriadExecutor());
         LOGGER.error("MyriadExecutor exit with status " +
         Integer.toString(driver.run() == Status.DRIVER_STOPPED ? 0 : 1));
       }
@@ -72,4 +80,22 @@ public class MyriadExecutorAuxService  extends 
AuxiliaryService {
     return null;
   }
 
+  @Override
+  public void stopContainer(ContainerTerminationContext stopContainerContext) {
+    sendStatus(stopContainerContext.getContainerId(), TaskState.TASK_FINISHED);
+  }
+
+  private void sendStatus(ContainerId containerId, TaskState taskState) {
+    Protos.TaskID taskId = Protos.TaskID.newBuilder()
+      .setValue(YARN_CONTAINER_TASK_ID_PREFIX + containerId.toString())
+      .build();
+
+    TaskStatus status = TaskStatus.newBuilder()
+      .setTaskId(taskId)
+      .setState(taskState)
+      .build();
+    driver.sendStatusUpdate(status);
+    LOGGER.debug("Sent status " + taskState + " for taskId " + taskId);
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/0fa49c26/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/event/handlers/ResourceOffersEventHandler.java
----------------------------------------------------------------------
diff --git 
a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/event/handlers/ResourceOffersEventHandler.java
 
b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/event/handlers/ResourceOffersEventHandler.java
index 51730ac..915bd2f 100644
--- 
a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/event/handlers/ResourceOffersEventHandler.java
+++ 
b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/event/handlers/ResourceOffersEventHandler.java
@@ -94,6 +94,7 @@ public class ResourceOffersEventHandler implements 
EventHandler<ResourceOffersEv
                 schedulerState.getActiveTasks())) {
               TaskInfo task = taskFactory.createTask(offer, pendingTaskId,
                   taskToLaunch);
+
               List<OfferID> offerIds = new ArrayList<>();
               offerIds.add(offer.getId());
               List<TaskInfo> tasks = new ArrayList<>();
@@ -104,6 +105,15 @@ public class ResourceOffersEventHandler implements 
EventHandler<ResourceOffersEv
               driver.launchTasks(offerIds, tasks);
               launchedTaskId = pendingTaskId;
 
+              // TODO (sdaingade) For every NM Task that we launch, we 
currently
+              // need to backup the ExecutorInfo for that NM Task in the State 
Store.
+              // Without this, we will not be able to launch tasks 
corresponding to yarn
+              // containers. This is specially important in case the RM 
restarts.
+              if (task.hasExecutor() && taskToLaunch.getExecutorInfo() == 
null) {
+                  taskToLaunch.setExecutorInfo(task.getExecutor());
+                  schedulerState.updateStateStore();
+              }
+
               taskToLaunch.setHostname(offer.getHostname());
               taskToLaunch.setSlaveId(offer.getSlaveId());
               offerMatch = true;

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/0fa49c26/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/fgs/NMHeartBeatHandler.java
----------------------------------------------------------------------
diff --git 
a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/fgs/NMHeartBeatHandler.java
 
b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/fgs/NMHeartBeatHandler.java
index 9fd97ba..47393a4 100644
--- 
a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/fgs/NMHeartBeatHandler.java
+++ 
b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/fgs/NMHeartBeatHandler.java
@@ -1,23 +1,20 @@
 package com.ebay.myriad.scheduler.fgs;
 
-import com.ebay.myriad.executor.ContainerTaskStatusRequest;
 import com.ebay.myriad.scheduler.MyriadDriver;
 import com.ebay.myriad.scheduler.SchedulerUtils;
 import com.ebay.myriad.scheduler.TaskFactory;
 import com.ebay.myriad.scheduler.yarn.interceptor.BaseInterceptor;
 import com.ebay.myriad.scheduler.yarn.interceptor.InterceptorRegistry;
 import com.ebay.myriad.state.SchedulerState;
-import com.google.gson.Gson;
-import java.nio.charset.Charset;
 import java.util.ArrayList;
 import java.util.List;
 import javax.inject.Inject;
-import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerState;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeStatusEvent;
@@ -118,8 +115,10 @@ public class NMHeartBeatHandler extends BaseInterceptor {
     RMNode rmNode = context.getRMNodes().get(event.getNodeId());
     String hostName = rmNode.getNodeID().getHost();
 
-    nodeStore.getNode(hostName).snapshotRunningContainers();
-    sendStatusUpdatesToMesosForCompletedContainers(statusEvent);
+    Node host = nodeStore.getNode(hostName);
+    if (host != null) {
+      host.snapshotRunningContainers();
+    }
 
     // New capacity of the node =
     // resources under use on the node (due to previous offers) +
@@ -155,54 +154,17 @@ public class NMHeartBeatHandler extends BaseInterceptor {
     Resource usedResources = Resource.newInstance(0, 0);
     for (ContainerStatus status : statusEvent.getContainers()) {
       if (status.getState() == ContainerState.NEW || status.getState() == 
ContainerState.RUNNING) {
-        Resources.addTo(usedResources, 
yarnScheduler.getRMContainer(status.getContainerId()).getAllocatedResource());
+        RMContainer rmContainer = 
yarnScheduler.getRMContainer(status.getContainerId());
+        // (sdaingade) This check is needed as RMContainer information may not 
be populated
+        // immediately after a RM restart.
+        if (rmContainer != null) {
+          Resources.addTo(usedResources, rmContainer.getAllocatedResource());
+        }
       }
     }
     return usedResources;
   }
 
-  private void 
sendStatusUpdatesToMesosForCompletedContainers(RMNodeStatusEvent statusEvent) {
-    // Send task update to Mesos
-    Protos.SlaveID slaveId = 
nodeStore.getNode(statusEvent.getNodeId().getHost()).getSlaveId();
-    for (ContainerStatus status : statusEvent.getContainers()) {
-      ContainerId containerId = status.getContainerId();
-      if (status.getState() == ContainerState.COMPLETE) {
-        requestExecutorToSendTaskStatusUpdate(slaveId, containerId, 
Protos.TaskState.TASK_FINISHED);
-      } else { // state == NEW | RUNNING
-        requestExecutorToSendTaskStatusUpdate(slaveId, containerId, 
Protos.TaskState.TASK_RUNNING);
-      }
-    }
-  }
-
-
-  /**
-   * sends a request to executor on the given slave to send back a status 
update
-   * for the mesos task launched for this container.
-   *
-   * TODO(Santosh):
-   *  Framework messages are unreliable. Try a NM auxiliary service that can 
help
-   *  send out the status messages from NM itself. NM and MyriadExecutor would 
need
-   *  to be merged into a single process.
-   *
-   * @param slaveId
-   * @param containerId
-   * @param taskState
-   */
-  private void requestExecutorToSendTaskStatusUpdate(Protos.SlaveID slaveId,
-      ContainerId containerId,
-      Protos.TaskState taskState) {
-    final String mesosTaskId = 
ContainerTaskStatusRequest.YARN_CONTAINER_TASK_ID_PREFIX + 
containerId.toString();
-    if (LOGGER.isDebugEnabled()) {
-      LOGGER.debug("Sending out framework message requesting the executor to 
send {} status for task: {}",
-          taskState.name(), mesosTaskId);
-    }
-    ContainerTaskStatusRequest containerTaskStatusRequest = new 
ContainerTaskStatusRequest();
-    containerTaskStatusRequest.setMesosTaskId(mesosTaskId);
-    containerTaskStatusRequest.setState(taskState.name());
-    myriadDriver.getDriver().sendFrameworkMessage(getExecutorId(slaveId), 
slaveId,
-        new 
Gson().toJson(containerTaskStatusRequest).getBytes(Charset.defaultCharset()));
-  }
-
   private Protos.ExecutorID getExecutorId(Protos.SlaveID slaveId) {
     return Protos.ExecutorID.newBuilder().setValue(
         TaskFactory.NMTaskFactoryImpl.EXECUTOR_PREFIX + 
slaveId.getValue()).build();

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/0fa49c26/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/fgs/YarnNodeCapacityManager.java
----------------------------------------------------------------------
diff --git 
a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/fgs/YarnNodeCapacityManager.java
 
b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/fgs/YarnNodeCapacityManager.java
index 497b43d..12bbe73 100644
--- 
a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/fgs/YarnNodeCapacityManager.java
+++ 
b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/fgs/YarnNodeCapacityManager.java
@@ -12,6 +12,7 @@ import com.google.common.collect.Sets;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
+
 import javax.inject.Inject;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.NodeId;
@@ -196,7 +197,7 @@ public class YarnNodeCapacityManager extends 
BaseInterceptor {
   public void setNodeCapacity(RMNode rmNode, Resource newCapacity) {
     rmNode.getTotalCapability().setMemory(newCapacity.getMemory());
     rmNode.getTotalCapability().setVirtualCores(newCapacity.getVirtualCores());
-
+    LOGGER.info("Setting capacity for node {} to {}", rmNode.getHostName(), 
newCapacity);
     // updates the scheduler with the new capacity for the NM.
     // the event is handled by the scheduler asynchronously
     rmContext.getDispatcher().getEventHandler().handle(
@@ -213,10 +214,12 @@ public class YarnNodeCapacityManager extends 
BaseInterceptor {
         Protos.TaskID taskId = Protos.TaskID.newBuilder()
             .setValue(ContainerTaskStatusRequest.YARN_CONTAINER_TASK_ID_PREFIX 
+ container.getId().toString()).build();
 
+        // TODO (sdaingade) Remove ExecutorInfo from the Node object
+        // as this is now cached in the NodeTask object in scheduler state.
         Protos.ExecutorInfo executorInfo = node.getExecInfo();
         if (executorInfo == null) {
             executorInfo = Protos.ExecutorInfo.newBuilder(
-                taskFactory.getExecutorInfoForSlave(offer.getSlaveId(), null))
+                 state.getNodeTask(offer.getSlaveId()).getExecutorInfo())
                 .setFrameworkId(offer.getFrameworkId()).build();
             node.setExecInfo(executorInfo);
         }

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/0fa49c26/myriad-scheduler/src/main/java/com/ebay/myriad/state/NodeTask.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/main/java/com/ebay/myriad/state/NodeTask.java 
b/myriad-scheduler/src/main/java/com/ebay/myriad/state/NodeTask.java
index 5b8b87d..8191eed 100644
--- a/myriad-scheduler/src/main/java/com/ebay/myriad/state/NodeTask.java
+++ b/myriad-scheduler/src/main/java/com/ebay/myriad/state/NodeTask.java
@@ -32,6 +32,11 @@ public class NodeTask {
     @JsonProperty
     private Protos.TaskStatus taskStatus;
 
+    /**
+     * Mesos executor for this node.
+     */
+    private Protos.ExecutorInfo executorInfo;
+
     public NodeTask(NMProfile profile) {
         this.profile = profile;
         this.hostname = "";
@@ -68,4 +73,12 @@ public class NodeTask {
     public void setTaskStatus(Protos.TaskStatus taskStatus) {
         this.taskStatus = taskStatus;
     }
+
+    public Protos.ExecutorInfo getExecutorInfo() {
+        return executorInfo;
+    }
+
+    public void setExecutorInfo(Protos.ExecutorInfo executorInfo) {
+        this.executorInfo = executorInfo;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/0fa49c26/myriad-scheduler/src/main/java/com/ebay/myriad/state/SchedulerState.java
----------------------------------------------------------------------
diff --git 
a/myriad-scheduler/src/main/java/com/ebay/myriad/state/SchedulerState.java 
b/myriad-scheduler/src/main/java/com/ebay/myriad/state/SchedulerState.java
index 75503b6..e27e976 100644
--- a/myriad-scheduler/src/main/java/com/ebay/myriad/state/SchedulerState.java
+++ b/myriad-scheduler/src/main/java/com/ebay/myriad/state/SchedulerState.java
@@ -31,6 +31,8 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.mesos.Protos;
+import org.apache.mesos.Protos.SlaveID;
+
 import com.ebay.myriad.state.utils.StoreContext;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
 
@@ -188,6 +190,16 @@ public class SchedulerState {
         return activeNodeTasks;
     }
 
+    public NodeTask getNodeTask(SlaveID slaveId) {
+        for (Map.Entry<Protos.TaskID, NodeTask> entry : tasks.entrySet()) {
+            if (entry.getValue().getSlaveId() != null &&
+                entry.getValue().getSlaveId().equals(slaveId)) {
+                return entry.getValue(); 
+            }
+        }
+        return null;
+    }
+
     public Set<Protos.TaskID> getStagingTaskIds() {
         return this.stagingTasks;
     }
@@ -226,7 +238,7 @@ public class SchedulerState {
         updateStateStore();
     }
 
-    private void updateStateStore() {
+    public void updateStateStore() {
         if (!isMyriadStateStore()) {
             return;
         }

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/0fa49c26/myriad-scheduler/src/main/java/com/ebay/myriad/state/utils/ByteBufferSupport.java
----------------------------------------------------------------------
diff --git 
a/myriad-scheduler/src/main/java/com/ebay/myriad/state/utils/ByteBufferSupport.java
 
b/myriad-scheduler/src/main/java/com/ebay/myriad/state/utils/ByteBufferSupport.java
index e1081f0..3d8d57e 100644
--- 
a/myriad-scheduler/src/main/java/com/ebay/myriad/state/utils/ByteBufferSupport.java
+++ 
b/myriad-scheduler/src/main/java/com/ebay/myriad/state/utils/ByteBufferSupport.java
@@ -116,6 +116,12 @@ public class ByteBufferSupport {
     } else {
       size += INT_SIZE;
     }
+    
+    if (nt.getExecutorInfo() != null) {
+        size += nt.getExecutorInfo().getSerializedSize() + INT_SIZE;
+    } else {
+        size += INT_SIZE;
+    }
 
     // Allocate and populate the buffer.
     ByteBuffer bb = createBuffer(size);
@@ -123,6 +129,7 @@ public class ByteBufferSupport {
     putBytes(bb, hostname);
     putBytes(bb, getSlaveBytes(nt));
     putBytes(bb, getTaskBytes(nt));
+    putBytes(bb, getExecutorInfoBytes(nt));
     // Make sure the buffer is at the beginning
     bb.rewind();
     return bb;
@@ -170,6 +177,7 @@ public class ByteBufferSupport {
       nt.setHostname(toString(bb));
       nt.setSlaveId(toSlaveId(bb));
       nt.setTaskStatus(toTaskStatus(bb));
+      nt.setExecutorInfo(toExecutorInfo(bb));
     }
     return nt;
   }
@@ -182,6 +190,14 @@ public class ByteBufferSupport {
     }
   }
 
+  public static byte[] getExecutorInfoBytes(NodeTask nt) {
+    if (nt.getExecutorInfo() != null) {
+      return nt.getExecutorInfo().toByteArray();
+    } else {
+      return ZERO_BYTES;
+    }
+  }
+
   public static byte[] getSlaveBytes(NodeTask nt) {
     if (nt.getSlaveId() != null) {
       return nt.getSlaveId().toByteArray();
@@ -272,6 +288,20 @@ public class ByteBufferSupport {
     }
   }
 
+  public static Protos.ExecutorInfo toExecutorInfo(ByteBuffer bb) {
+    int size = bb.getInt();
+    if (size > 0) {
+      try {
+        return Protos.ExecutorInfo.parseFrom(getBytes(bb, size));
+      } catch (Exception e) {
+        throw new RuntimeException("ByteBuffer not in expected format," +
+          " failed to parse ExecutorInfo bytes", e);
+      }
+    } else {
+      return null;
+    }
+  }
+
   public static ByteBuffer fillBuffer(byte src[]) {
     ByteBuffer bb = createBuffer(src.length);
     bb.put(src);

Reply via email to