Changed agent to send TASK_DROPPED for task launch failures. If the agent cannot launch a task due to a variety of possible error conditions, we now send TASK_DROPPED to partition-aware frameworks rather than TASK_LOST.
Review: https://reviews.apache.org/r/52746/ Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/c4b69ec1 Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/c4b69ec1 Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/c4b69ec1 Branch: refs/heads/master Commit: c4b69ec1b8ac798f3f68bf116f596a2d09aeaa15 Parents: 0645e7d Author: Neil Conway <neil.con...@gmail.com> Authored: Fri Oct 21 14:13:30 2016 -0700 Committer: Vinod Kone <vinodk...@gmail.com> Committed: Fri Oct 21 14:13:30 2016 -0700 ---------------------------------------------------------------------- src/slave/slave.cpp | 102 ++++++++++++++++++++++++++++++++++++++--------- 1 file changed, 83 insertions(+), 19 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/c4b69ec1/src/slave/slave.cpp ---------------------------------------------------------------------- diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp index a0019cb..ff3ed02 100644 --- a/src/slave/slave.cpp +++ b/src/slave/slave.cpp @@ -1791,12 +1791,21 @@ void Slave::_run( LOG(ERROR) << "Failed to unschedule directories scheduled for gc: " << (future.isFailed() ? future.failure() : "future discarded"); + // We report TASK_DROPPED to the framework because the task was + // never launched. For non-partition-aware frameworks, we report + // TASK_LOST for backward compatibility. + mesos::TaskState taskState = TASK_DROPPED; + if (!protobuf::frameworkHasCapability( + frameworkInfo, FrameworkInfo::Capability::PARTITION_AWARE)) { + taskState = TASK_LOST; + } + foreach (const TaskInfo& task, tasks) { const StatusUpdate update = protobuf::createStatusUpdate( frameworkId, info.id(), task.task_id(), - TASK_LOST, + taskState, TaskStatus::SOURCE_SLAVE, UUID::random(), "Could not launch the task because we failed to unschedule" @@ -1822,8 +1831,8 @@ void Slave::_run( // NOTE: If the task/task group or executor uses resources that are // checkpointed on the slave (e.g. persistent volumes), we should // already know about it. If the slave doesn't know about them (e.g. - // CheckpointResourcesMessage was dropped or came out of order), - // we send TASK_LOST status updates here since restarting the task + // CheckpointResourcesMessage was dropped or came out of order), we + // send TASK_DROPPED status updates here since restarting the task // may succeed in the event that CheckpointResourcesMessage arrives // out of order. bool kill = false; @@ -1844,12 +1853,21 @@ void Slave::_run( } if (kill) { + // We report TASK_DROPPED to the framework because the task was + // never launched. For non-partition-aware frameworks, we report + // TASK_LOST for backward compatibility. + mesos::TaskState taskState = TASK_DROPPED; + if (!protobuf::frameworkHasCapability( + frameworkInfo, FrameworkInfo::Capability::PARTITION_AWARE)) { + taskState = TASK_LOST; + } + foreach (const TaskInfo& task, tasks) { const StatusUpdate update = protobuf::createStatusUpdate( frameworkId, info.id(), task.task_id(), - TASK_LOST, + taskState, TaskStatus::SOURCE_SLAVE, UUID::random(), "The checkpointed resources being used by the task or task group are " @@ -1884,12 +1902,21 @@ void Slave::_run( } if (kill) { + // We report TASK_DROPPED to the framework because the task was + // never launched. For non-partition-aware frameworks, we report + // TASK_LOST for backward compatibility. + mesos::TaskState taskState = TASK_DROPPED; + if (!protobuf::frameworkHasCapability( + frameworkInfo, FrameworkInfo::Capability::PARTITION_AWARE)) { + taskState = TASK_LOST; + } + foreach (const TaskInfo& task, tasks) { const StatusUpdate update = protobuf::createStatusUpdate( frameworkId, info.id(), task.task_id(), - TASK_LOST, + taskState, TaskStatus::SOURCE_SLAVE, UUID::random(), "The checkpointed resources being used by the executor are unknown " @@ -1960,12 +1987,21 @@ void Slave::_run( << " with executor '" << executorId << "' which is " << executorState; + // We report TASK_DROPPED to the framework because the task was + // never launched. For non-partition-aware frameworks, we report + // TASK_LOST for backward compatibility. + mesos::TaskState taskState = TASK_DROPPED; + if (!protobuf::frameworkHasCapability( + frameworkInfo, FrameworkInfo::Capability::PARTITION_AWARE)) { + taskState = TASK_LOST; + } + foreach (const TaskInfo& task, tasks) { const StatusUpdate update = protobuf::createStatusUpdate( frameworkId, info.id(), task.task_id(), - TASK_LOST, + taskState, TaskStatus::SOURCE_SLAVE, UUID::random(), "Executor " + executorState, @@ -2356,13 +2392,21 @@ void Slave::killTask( LOG(WARNING) << "Cannot kill task " << taskId << " of framework " << frameworkId << " because no corresponding executor is running"; - // We send a TASK_LOST update because this task has never - // been launched on this slave. + + // We send a TASK_DROPPED update because this task has never been + // launched on this slave. If the framework is not partition-aware, + // we send TASK_LOST for backward compatibility. + mesos::TaskState taskState = TASK_DROPPED; + if (!protobuf::frameworkHasCapability( + framework->info, FrameworkInfo::Capability::PARTITION_AWARE)) { + taskState = TASK_LOST; + } + const StatusUpdate update = protobuf::createStatusUpdate( frameworkId, info.id(), taskId, - TASK_LOST, + taskState, TaskStatus::SOURCE_SLAVE, UUID::random(), "Cannot find executor", @@ -3190,27 +3234,37 @@ void Slave::subscribe( // Now, if there is any task still in STAGING state and not in // unacknowledged 'tasks' known to the executor, the slave must // have died before the executor received the task! We should - // transition it to TASK_LOST. We only consider/store + // transition it to TASK_DROPPED. We only consider/store // unacknowledged 'tasks' at the executor driver because if a - // task has been acknowledged, the slave must have received - // an update for that task and transitioned it out of STAGING! + // task has been acknowledged, the slave must have received an + // update for that task and transitioned it out of STAGING! + // // TODO(vinod): Consider checkpointing 'TaskInfo' instead of - // 'Task' so that we can relaunch such tasks! Currently we - // don't do it because 'TaskInfo.data' could be huge. + // 'Task' so that we can relaunch such tasks! Currently we don't + // do it because 'TaskInfo.data' could be huge. + // // TODO(vinod): Use foreachvalue instead once LinkedHashmap // supports it. foreach (Task* task, executor->launchedTasks.values()) { if (task->state() == TASK_STAGING && !unackedTasks.contains(task->task_id())) { + mesos::TaskState newTaskState = TASK_DROPPED; + if (!protobuf::frameworkHasCapability( + framework->info, + FrameworkInfo::Capability::PARTITION_AWARE)) { + newTaskState = TASK_LOST; + } + LOG(INFO) << "Transitioning STAGED task " << task->task_id() - << " to LOST because it is unknown to the executor " + << " to " << newTaskState + << " because it is unknown to the executor " << executor->id; const StatusUpdate update = protobuf::createStatusUpdate( framework->id(), info.id(), task->task_id(), - TASK_LOST, + newTaskState, TaskStatus::SOURCE_SLAVE, UUID::random(), "Task launched during agent restart", @@ -3500,27 +3554,37 @@ void Slave::reregisterExecutor( // Now, if there is any task still in STAGING state and not in // unacknowledged 'tasks' known to the executor, the slave must // have died before the executor received the task! We should - // transition it to TASK_LOST. We only consider/store + // transition it to TASK_DROPPED. We only consider/store // unacknowledged 'tasks' at the executor driver because if a // task has been acknowledged, the slave must have received // an update for that task and transitioned it out of STAGING! + // // TODO(vinod): Consider checkpointing 'TaskInfo' instead of // 'Task' so that we can relaunch such tasks! Currently we // don't do it because 'TaskInfo.data' could be huge. + // // TODO(vinod): Use foreachvalue instead once LinkedHashmap // supports it. foreach (Task* task, executor->launchedTasks.values()) { if (task->state() == TASK_STAGING && !unackedTasks.contains(task->task_id())) { + mesos::TaskState newTaskState = TASK_DROPPED; + if (!protobuf::frameworkHasCapability( + framework->info, + FrameworkInfo::Capability::PARTITION_AWARE)) { + newTaskState = TASK_LOST; + } + LOG(INFO) << "Transitioning STAGED task " << task->task_id() - << " to LOST because it is unknown to the executor '" + << " to " << newTaskState + << " because it is unknown to the executor '" << executorId << "'"; const StatusUpdate update = protobuf::createStatusUpdate( frameworkId, info.id(), task->task_id(), - TASK_LOST, + newTaskState, TaskStatus::SOURCE_SLAVE, UUID::random(), "Task launched during agent restart",