Implemented the LaunchGroup Offer::Operation in the master. This operation is all-or-nothing, in that all tasks must be launched together. If the operation fails, all tasks will fail with TASK_ERROR and the appropriate GROUP reason. If a task was killed before delivery to the executor, all tasks are killed.
Review: https://reviews.apache.org/r/51320 Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/bf3957f4 Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/bf3957f4 Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/bf3957f4 Branch: refs/heads/master Commit: bf3957f4b534a04a0f76456f76c4e28bdee4e76d Parents: 3674c58 Author: Benjamin Mahler <bmah...@apache.org> Authored: Mon Aug 22 20:38:50 2016 -0700 Committer: Benjamin Mahler <bmah...@apache.org> Committed: Wed Aug 24 14:33:39 2016 -0700 ---------------------------------------------------------------------- include/mesos/mesos.proto | 2 + include/mesos/v1/mesos.proto | 2 + src/master/master.cpp | 226 +++++++++++++++++++++++++++++++++++--- src/messages/messages.proto | 15 +++ 4 files changed, 230 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/bf3957f4/include/mesos/mesos.proto ---------------------------------------------------------------------- diff --git a/include/mesos/mesos.proto b/include/mesos/mesos.proto index 53b6547..a93db55 100644 --- a/include/mesos/mesos.proto +++ b/include/mesos/mesos.proto @@ -1536,6 +1536,8 @@ message TaskStatus { REASON_SLAVE_REMOVED = 11; REASON_SLAVE_RESTARTED = 12; REASON_SLAVE_UNKNOWN = 13; + REASON_TASK_GROUP_INVALID = 25; + REASON_TASK_GROUP_UNAUTHORIZED = 26; REASON_TASK_INVALID = 14; REASON_TASK_UNAUTHORIZED = 15; REASON_TASK_UNKNOWN = 16; http://git-wip-us.apache.org/repos/asf/mesos/blob/bf3957f4/include/mesos/v1/mesos.proto ---------------------------------------------------------------------- diff --git a/include/mesos/v1/mesos.proto b/include/mesos/v1/mesos.proto index f6b59e1..4a7e998 100644 --- a/include/mesos/v1/mesos.proto +++ b/include/mesos/v1/mesos.proto @@ -1535,6 +1535,8 @@ message TaskStatus { REASON_AGENT_REMOVED = 11; REASON_AGENT_RESTARTED = 12; REASON_AGENT_UNKNOWN = 13; + REASON_TASK_GROUP_INVALID = 25; + REASON_TASK_GROUP_UNAUTHORIZED = 26; REASON_TASK_INVALID = 14; REASON_TASK_UNAUTHORIZED = 15; REASON_TASK_UNKNOWN = 16; http://git-wip-us.apache.org/repos/asf/mesos/blob/bf3957f4/src/master/master.cpp ---------------------------------------------------------------------- diff --git a/src/master/master.cpp b/src/master/master.cpp index 910293a..c300e1f 100644 --- a/src/master/master.cpp +++ b/src/master/master.cpp @@ -22,6 +22,7 @@ #include <iomanip> #include <list> #include <memory> +#include <set> #include <sstream> #include <mesos/module.hpp> @@ -89,7 +90,10 @@ #include "watcher/whitelist_watcher.hpp" +using google::protobuf::RepeatedPtrField; + using std::list; +using std::set; using std::shared_ptr; using std::string; using std::vector; @@ -3310,11 +3314,21 @@ void Master::accept( << "': " << error.get().message; foreach (const Offer::Operation& operation, accept.operations()) { - if (operation.type() != Offer::Operation::LAUNCH) { + if (operation.type() != Offer::Operation::LAUNCH && + operation.type() != Offer::Operation::LAUNCH_GROUP) { continue; } - foreach (const TaskInfo& task, operation.launch().task_infos()) { + const RepeatedPtrField<TaskInfo>& tasks = [&]() { + if (operation.type() == Offer::Operation::LAUNCH) { + return operation.launch().task_infos(); + } else if (operation.type() == Offer::Operation::LAUNCH_GROUP) { + return operation.launch_group().task_group().tasks(); + } + UNREACHABLE(); + }(); + + foreach (const TaskInfo& task, tasks) { const StatusUpdate& update = protobuf::createStatusUpdate( framework->id(), task.slave_id(), @@ -3349,10 +3363,20 @@ void Master::accept( list<Future<bool>> futures; foreach (const Offer::Operation& operation, accept.operations()) { switch (operation.type()) { - case Offer::Operation::LAUNCH: { + case Offer::Operation::LAUNCH: + case Offer::Operation::LAUNCH_GROUP: { + const RepeatedPtrField<TaskInfo>& tasks = [&]() { + if (operation.type() == Offer::Operation::LAUNCH) { + return operation.launch().task_infos(); + } else if (operation.type() == Offer::Operation::LAUNCH_GROUP) { + return operation.launch_group().task_group().tasks(); + } + UNREACHABLE(); + }(); + // Authorize the tasks. A task is in 'framework->pendingTasks' // before it is authorized. - foreach (const TaskInfo& task, operation.launch().task_infos()) { + foreach (const TaskInfo& task, tasks) { futures.push_back(authorizeTask(task, framework)); // Add to pending tasks. @@ -3371,12 +3395,6 @@ void Master::accept( break; } - case Offer::Operation::LAUNCH_GROUP : { - // TODO(vinod): Implement this. - LOG(WARNING) << "Ignoring unimplemented LAUNCH_GROUP operation"; - break; - } - // NOTE: When handling RESERVE and UNRESERVE operations, authorization // will proceed even if no principal is specified, although currently // resources cannot be reserved or unreserved unless a principal is @@ -3485,11 +3503,21 @@ void Master::_accept( if (slave == nullptr || !slave->connected) { foreach (const Offer::Operation& operation, accept.operations()) { - if (operation.type() != Offer::Operation::LAUNCH) { + if (operation.type() != Offer::Operation::LAUNCH && + operation.type() != Offer::Operation::LAUNCH_GROUP) { continue; } - foreach (const TaskInfo& task, operation.launch().task_infos()) { + const RepeatedPtrField<TaskInfo>& tasks = [&]() { + if (operation.type() == Offer::Operation::LAUNCH) { + return operation.launch().task_infos(); + } else { + CHECK_EQ(Offer::Operation::LAUNCH_GROUP, operation.type()); + return operation.launch_group().task_group().tasks(); + } + }(); + + foreach (const TaskInfo& task, tasks) { const TaskStatus::Reason reason = slave == nullptr ? TaskStatus::REASON_SLAVE_REMOVED : TaskStatus::REASON_SLAVE_DISCONNECTED; @@ -3885,9 +3913,177 @@ void Master::_accept( break; } - case Offer::Operation::LAUNCH_GROUP : { - // TODO(vinod): Implement this. - LOG(WARNING) << "Ignoring unimplemented LAUNCH_GROUP operation"; + case Offer::Operation::LAUNCH_GROUP: { + // We must ensure that the entire group can be launched. This + // means all tasks in the group must be authorized and valid. + // If any tasks in the group have been killed in the interim + // we must kill the entire group. + const ExecutorInfo& executor = operation.launch_group().executor(); + const TaskGroupInfo& taskGroup = operation.launch_group().task_group(); + + // Note that we do not fill in the `ExecutorInfo.framework_id` + // since we do not have to support backwards compatiblity like + // in the `Launch` operation case. + + // TODO(bmahler): Consider injecting some default (cpus, mem, disk) + // resources when the framework omits the executor resources. + + // See if there are any validation or authorization errors. + // Note that we'll only report the first error we encounter + // for the group. + Option<Error> error = + validation::task::group::validate( + taskGroup, executor, framework, slave, _offeredResources); + + Option<TaskStatus::Reason> reason = None(); + + if (error.isSome()) { + reason = TaskStatus::REASON_TASK_GROUP_INVALID; + } else { + foreach (const TaskInfo& task, taskGroup.tasks()) { + Future<bool> authorization = authorizations.front(); + authorizations.pop_front(); + + CHECK(!authorization.isDiscarded()); + + if (authorization.isFailed()) { + error = Error("Failed to authorize task" + " '" + stringify(task.task_id()) + "'" + ": " + authorization.failure()); + + reason = TaskStatus::REASON_TASK_GROUP_UNAUTHORIZED; + + break; + } + + if (!authorization.get()) { + string user = framework->info.user(); // Default user. + if (task.has_command() && task.command().has_user()) { + user = task.command().user(); + } + + error = Error("Task '" + stringify(task.task_id()) + "'" + " is not authorized to launch as" + " user '" + user + "'"); + + reason = TaskStatus::REASON_TASK_GROUP_UNAUTHORIZED; + + break; + } + } + } + + if (error.isSome()) { + CHECK_SOME(reason); + + // NOTE: If some of these invalid or unauthorized tasks were + // killed already, here we end up sending a TASK_ERROR after + // having already sent TASK_KILLED. + foreach (const TaskInfo& task, taskGroup.tasks()) { + const StatusUpdate& update = protobuf::createStatusUpdate( + framework->id(), + task.slave_id(), + task.task_id(), + TASK_ERROR, + TaskStatus::SOURCE_MASTER, + None(), + error->message, + reason.get()); + + metrics->tasks_error++; + + metrics->incrementTasksStates( + TASK_ERROR, TaskStatus::SOURCE_MASTER, reason.get()); + + forward(update, UPID(), framework); + } + + break; + } + + // Remove all the tasks from being pending. If any of the tasks + // have been killed in the interim, we will send TASK_KILLED + // for all other tasks in the group, since a TaskGroup must + // be delivered in its entirety. + hashset<TaskID> killed; + foreach (const TaskInfo& task, taskGroup.tasks()) { + bool pending = framework->pendingTasks.contains(task.task_id()); + framework->pendingTasks.erase(task.task_id()); + + if (!pending) { + killed.insert(task.task_id()); + } + } + + // If task(s) were killed, send TASK_KILLED for + // all of the remaining tasks. + // + // TODO(bmahler): Do this killing when processing + // the `Kill` call, rather than doing it here. + if (!killed.empty()) { + foreach (const TaskInfo& task, taskGroup.tasks()) { + if (!killed.contains(task.task_id())) { + const StatusUpdate& update = protobuf::createStatusUpdate( + framework->id(), + task.slave_id(), + task.task_id(), + TASK_KILLED, + TaskStatus::SOURCE_MASTER, + None(), + "A task within the task group was killed before" + " delivery to the agent"); + + metrics->tasks_killed++; + + // TODO(bmahler): Increment the task state source metric, + // we currently cannot because it requires each source + // requires a reason. + + forward(update, UPID(), framework); + } + } + + break; + } + + // Now launch the task group! + RunTaskGroupMessage message; + message.mutable_framework()->CopyFrom(framework->info); + message.mutable_executor()->CopyFrom(executor); + message.mutable_task_group()->CopyFrom(taskGroup); + + set<TaskID> taskIds; + Resources totalResources; + + for (int i = 0; i < message.task_group().tasks().size(); ++i) { + TaskInfo* task = message.mutable_task_group()->mutable_tasks(i); + + taskIds.insert(task->task_id()); + totalResources += task->resources(); + + const Resources consumed = addTask(*task, framework, slave); + + CHECK(_offeredResources.contains(consumed)) + << _offeredResources << " does not contain " << consumed; + + _offeredResources -= consumed; + + if (HookManager::hooksAvailable()) { + // Set labels retrieved from label-decorator hooks. + task->mutable_labels()->CopyFrom( + HookManager::masterLaunchTaskLabelDecorator( + *task, + framework->info, + slave->info)); + } + } + + LOG(INFO) << "Launching task group " << stringify(taskIds) + << " of framework " << *framework + << " with resources " << totalResources + << " on agent " << *slave; + + send(slave->pid, message); break; } http://git-wip-us.apache.org/repos/asf/mesos/blob/bf3957f4/src/messages/messages.proto ---------------------------------------------------------------------- diff --git a/src/messages/messages.proto b/src/messages/messages.proto index 7b5e24f..17bb352 100644 --- a/src/messages/messages.proto +++ b/src/messages/messages.proto @@ -301,6 +301,21 @@ message RunTaskMessage { /** + * This message either notifies an existing executor to run a task + * group, or starts a new executor and runs the task group. This + * message is sent when scheduler::Call::Accept is sent with + * Offer::Operation::LaunchGroup. + * + * See executor::Event::LaunchGroup. + */ +message RunTaskGroupMessage { + required FrameworkInfo framework = 1; + required ExecutorInfo executor = 2; + required TaskGroupInfo task_group = 3; +} + + +/** * Kills a specific task. * * See scheduler::Call::Kill and executor::Event::Kill.