Changed agent to send TASK_DROPPED for task launch failures.

If the agent cannot launch a task due to a variety of possible error
conditions, we now send TASK_DROPPED to partition-aware frameworks
rather than TASK_LOST.

Review: https://reviews.apache.org/r/52746/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/c4b69ec1
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/c4b69ec1
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/c4b69ec1

Branch: refs/heads/master
Commit: c4b69ec1b8ac798f3f68bf116f596a2d09aeaa15
Parents: 0645e7d
Author: Neil Conway <neil.con...@gmail.com>
Authored: Fri Oct 21 14:13:30 2016 -0700
Committer: Vinod Kone <vinodk...@gmail.com>
Committed: Fri Oct 21 14:13:30 2016 -0700

----------------------------------------------------------------------
 src/slave/slave.cpp | 102 ++++++++++++++++++++++++++++++++++++++---------
 1 file changed, 83 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/c4b69ec1/src/slave/slave.cpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index a0019cb..ff3ed02 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -1791,12 +1791,21 @@ void Slave::_run(
     LOG(ERROR) << "Failed to unschedule directories scheduled for gc: "
                << (future.isFailed() ? future.failure() : "future discarded");
 
+    // We report TASK_DROPPED to the framework because the task was
+    // never launched. For non-partition-aware frameworks, we report
+    // TASK_LOST for backward compatibility.
+    mesos::TaskState taskState = TASK_DROPPED;
+    if (!protobuf::frameworkHasCapability(
+            frameworkInfo, FrameworkInfo::Capability::PARTITION_AWARE)) {
+      taskState = TASK_LOST;
+    }
+
     foreach (const TaskInfo& task, tasks) {
       const StatusUpdate update = protobuf::createStatusUpdate(
           frameworkId,
           info.id(),
           task.task_id(),
-          TASK_LOST,
+          taskState,
           TaskStatus::SOURCE_SLAVE,
           UUID::random(),
           "Could not launch the task because we failed to unschedule"
@@ -1822,8 +1831,8 @@ void Slave::_run(
   // NOTE: If the task/task group or executor uses resources that are
   // checkpointed on the slave (e.g. persistent volumes), we should
   // already know about it. If the slave doesn't know about them (e.g.
-  // CheckpointResourcesMessage was dropped or came out of order),
-  // we send TASK_LOST status updates here since restarting the task
+  // CheckpointResourcesMessage was dropped or came out of order), we
+  // send TASK_DROPPED status updates here since restarting the task
   // may succeed in the event that CheckpointResourcesMessage arrives
   // out of order.
   bool kill = false;
@@ -1844,12 +1853,21 @@ void Slave::_run(
   }
 
   if (kill) {
+    // We report TASK_DROPPED to the framework because the task was
+    // never launched. For non-partition-aware frameworks, we report
+    // TASK_LOST for backward compatibility.
+    mesos::TaskState taskState = TASK_DROPPED;
+    if (!protobuf::frameworkHasCapability(
+            frameworkInfo, FrameworkInfo::Capability::PARTITION_AWARE)) {
+      taskState = TASK_LOST;
+    }
+
     foreach (const TaskInfo& task, tasks) {
       const StatusUpdate update = protobuf::createStatusUpdate(
           frameworkId,
           info.id(),
           task.task_id(),
-          TASK_LOST,
+          taskState,
           TaskStatus::SOURCE_SLAVE,
           UUID::random(),
           "The checkpointed resources being used by the task or task group are 
"
@@ -1884,12 +1902,21 @@ void Slave::_run(
   }
 
   if (kill) {
+    // We report TASK_DROPPED to the framework because the task was
+    // never launched. For non-partition-aware frameworks, we report
+    // TASK_LOST for backward compatibility.
+    mesos::TaskState taskState = TASK_DROPPED;
+    if (!protobuf::frameworkHasCapability(
+            frameworkInfo, FrameworkInfo::Capability::PARTITION_AWARE)) {
+      taskState = TASK_LOST;
+    }
+
     foreach (const TaskInfo& task, tasks) {
       const StatusUpdate update = protobuf::createStatusUpdate(
           frameworkId,
           info.id(),
           task.task_id(),
-          TASK_LOST,
+          taskState,
           TaskStatus::SOURCE_SLAVE,
           UUID::random(),
           "The checkpointed resources being used by the executor are unknown "
@@ -1960,12 +1987,21 @@ void Slave::_run(
                    << " with executor '" << executorId
                    << "' which is " << executorState;
 
+      // We report TASK_DROPPED to the framework because the task was
+      // never launched. For non-partition-aware frameworks, we report
+      // TASK_LOST for backward compatibility.
+      mesos::TaskState taskState = TASK_DROPPED;
+      if (!protobuf::frameworkHasCapability(
+              frameworkInfo, FrameworkInfo::Capability::PARTITION_AWARE)) {
+        taskState = TASK_LOST;
+      }
+
       foreach (const TaskInfo& task, tasks) {
         const StatusUpdate update = protobuf::createStatusUpdate(
             frameworkId,
             info.id(),
             task.task_id(),
-            TASK_LOST,
+            taskState,
             TaskStatus::SOURCE_SLAVE,
             UUID::random(),
             "Executor " + executorState,
@@ -2356,13 +2392,21 @@ void Slave::killTask(
     LOG(WARNING) << "Cannot kill task " << taskId
                  << " of framework " << frameworkId
                  << " because no corresponding executor is running";
-    // We send a TASK_LOST update because this task has never
-    // been launched on this slave.
+
+    // We send a TASK_DROPPED update because this task has never been
+    // launched on this slave. If the framework is not partition-aware,
+    // we send TASK_LOST for backward compatibility.
+    mesos::TaskState taskState = TASK_DROPPED;
+    if (!protobuf::frameworkHasCapability(
+            framework->info, FrameworkInfo::Capability::PARTITION_AWARE)) {
+      taskState = TASK_LOST;
+    }
+
     const StatusUpdate update = protobuf::createStatusUpdate(
         frameworkId,
         info.id(),
         taskId,
-        TASK_LOST,
+        taskState,
         TaskStatus::SOURCE_SLAVE,
         UUID::random(),
         "Cannot find executor",
@@ -3190,27 +3234,37 @@ void Slave::subscribe(
       // Now, if there is any task still in STAGING state and not in
       // unacknowledged 'tasks' known to the executor, the slave must
       // have died before the executor received the task! We should
-      // transition it to TASK_LOST. We only consider/store
+      // transition it to TASK_DROPPED. We only consider/store
       // unacknowledged 'tasks' at the executor driver because if a
-      // task has been acknowledged, the slave must have received
-      // an update for that task and transitioned it out of STAGING!
+      // task has been acknowledged, the slave must have received an
+      // update for that task and transitioned it out of STAGING!
+      //
       // TODO(vinod): Consider checkpointing 'TaskInfo' instead of
-      // 'Task' so that we can relaunch such tasks! Currently we
-      // don't do it because 'TaskInfo.data' could be huge.
+      // 'Task' so that we can relaunch such tasks! Currently we don't
+      // do it because 'TaskInfo.data' could be huge.
+      //
       // TODO(vinod): Use foreachvalue instead once LinkedHashmap
       // supports it.
       foreach (Task* task, executor->launchedTasks.values()) {
         if (task->state() == TASK_STAGING &&
             !unackedTasks.contains(task->task_id())) {
+          mesos::TaskState newTaskState = TASK_DROPPED;
+          if (!protobuf::frameworkHasCapability(
+                  framework->info,
+                  FrameworkInfo::Capability::PARTITION_AWARE)) {
+            newTaskState = TASK_LOST;
+          }
+
           LOG(INFO) << "Transitioning STAGED task " << task->task_id()
-                    << " to LOST because it is unknown to the executor "
+                    << " to " << newTaskState
+                    << " because it is unknown to the executor "
                     << executor->id;
 
           const StatusUpdate update = protobuf::createStatusUpdate(
               framework->id(),
               info.id(),
               task->task_id(),
-              TASK_LOST,
+              newTaskState,
               TaskStatus::SOURCE_SLAVE,
               UUID::random(),
               "Task launched during agent restart",
@@ -3500,27 +3554,37 @@ void Slave::reregisterExecutor(
       // Now, if there is any task still in STAGING state and not in
       // unacknowledged 'tasks' known to the executor, the slave must
       // have died before the executor received the task! We should
-      // transition it to TASK_LOST. We only consider/store
+      // transition it to TASK_DROPPED. We only consider/store
       // unacknowledged 'tasks' at the executor driver because if a
       // task has been acknowledged, the slave must have received
       // an update for that task and transitioned it out of STAGING!
+      //
       // TODO(vinod): Consider checkpointing 'TaskInfo' instead of
       // 'Task' so that we can relaunch such tasks! Currently we
       // don't do it because 'TaskInfo.data' could be huge.
+      //
       // TODO(vinod): Use foreachvalue instead once LinkedHashmap
       // supports it.
       foreach (Task* task, executor->launchedTasks.values()) {
         if (task->state() == TASK_STAGING &&
             !unackedTasks.contains(task->task_id())) {
+          mesos::TaskState newTaskState = TASK_DROPPED;
+          if (!protobuf::frameworkHasCapability(
+                  framework->info,
+                  FrameworkInfo::Capability::PARTITION_AWARE)) {
+            newTaskState = TASK_LOST;
+          }
+
           LOG(INFO) << "Transitioning STAGED task " << task->task_id()
-                    << " to LOST because it is unknown to the executor '"
+                    << " to " << newTaskState
+                    << " because it is unknown to the executor '"
                     << executorId << "'";
 
           const StatusUpdate update = protobuf::createStatusUpdate(
               frameworkId,
               info.id(),
               task->task_id(),
-              TASK_LOST,
+              newTaskState,
               TaskStatus::SOURCE_SLAVE,
               UUID::random(),
               "Task launched during agent restart",

Reply via email to