Implemented the LaunchGroup Offer::Operation in the master.

This operation is all-or-nothing, in that all tasks must be
launched together. If the operation fails, all tasks will
fail with TASK_ERROR and the appropriate GROUP reason.
If a task was killed before delivery to the executor, all
tasks are killed.

Review: https://reviews.apache.org/r/51320


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/bf3957f4
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/bf3957f4
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/bf3957f4

Branch: refs/heads/master
Commit: bf3957f4b534a04a0f76456f76c4e28bdee4e76d
Parents: 3674c58
Author: Benjamin Mahler <bmah...@apache.org>
Authored: Mon Aug 22 20:38:50 2016 -0700
Committer: Benjamin Mahler <bmah...@apache.org>
Committed: Wed Aug 24 14:33:39 2016 -0700

----------------------------------------------------------------------
 include/mesos/mesos.proto    |   2 +
 include/mesos/v1/mesos.proto |   2 +
 src/master/master.cpp        | 226 +++++++++++++++++++++++++++++++++++---
 src/messages/messages.proto  |  15 +++
 4 files changed, 230 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/bf3957f4/include/mesos/mesos.proto
----------------------------------------------------------------------
diff --git a/include/mesos/mesos.proto b/include/mesos/mesos.proto
index 53b6547..a93db55 100644
--- a/include/mesos/mesos.proto
+++ b/include/mesos/mesos.proto
@@ -1536,6 +1536,8 @@ message TaskStatus {
     REASON_SLAVE_REMOVED = 11;
     REASON_SLAVE_RESTARTED = 12;
     REASON_SLAVE_UNKNOWN = 13;
+    REASON_TASK_GROUP_INVALID = 25;
+    REASON_TASK_GROUP_UNAUTHORIZED = 26;
     REASON_TASK_INVALID = 14;
     REASON_TASK_UNAUTHORIZED = 15;
     REASON_TASK_UNKNOWN = 16;

http://git-wip-us.apache.org/repos/asf/mesos/blob/bf3957f4/include/mesos/v1/mesos.proto
----------------------------------------------------------------------
diff --git a/include/mesos/v1/mesos.proto b/include/mesos/v1/mesos.proto
index f6b59e1..4a7e998 100644
--- a/include/mesos/v1/mesos.proto
+++ b/include/mesos/v1/mesos.proto
@@ -1535,6 +1535,8 @@ message TaskStatus {
     REASON_AGENT_REMOVED = 11;
     REASON_AGENT_RESTARTED = 12;
     REASON_AGENT_UNKNOWN = 13;
+    REASON_TASK_GROUP_INVALID = 25;
+    REASON_TASK_GROUP_UNAUTHORIZED = 26;
     REASON_TASK_INVALID = 14;
     REASON_TASK_UNAUTHORIZED = 15;
     REASON_TASK_UNKNOWN = 16;

http://git-wip-us.apache.org/repos/asf/mesos/blob/bf3957f4/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index 910293a..c300e1f 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -22,6 +22,7 @@
 #include <iomanip>
 #include <list>
 #include <memory>
+#include <set>
 #include <sstream>
 
 #include <mesos/module.hpp>
@@ -89,7 +90,10 @@
 
 #include "watcher/whitelist_watcher.hpp"
 
+using google::protobuf::RepeatedPtrField;
+
 using std::list;
+using std::set;
 using std::shared_ptr;
 using std::string;
 using std::vector;
@@ -3310,11 +3314,21 @@ void Master::accept(
                  << "': " << error.get().message;
 
     foreach (const Offer::Operation& operation, accept.operations()) {
-      if (operation.type() != Offer::Operation::LAUNCH) {
+      if (operation.type() != Offer::Operation::LAUNCH &&
+          operation.type() != Offer::Operation::LAUNCH_GROUP) {
         continue;
       }
 
-      foreach (const TaskInfo& task, operation.launch().task_infos()) {
+      const RepeatedPtrField<TaskInfo>& tasks = [&]() {
+        if (operation.type() == Offer::Operation::LAUNCH) {
+          return operation.launch().task_infos();
+        } else if (operation.type() == Offer::Operation::LAUNCH_GROUP) {
+          return operation.launch_group().task_group().tasks();
+        }
+        UNREACHABLE();
+      }();
+
+      foreach (const TaskInfo& task, tasks) {
         const StatusUpdate& update = protobuf::createStatusUpdate(
             framework->id(),
             task.slave_id(),
@@ -3349,10 +3363,20 @@ void Master::accept(
   list<Future<bool>> futures;
   foreach (const Offer::Operation& operation, accept.operations()) {
     switch (operation.type()) {
-      case Offer::Operation::LAUNCH: {
+      case Offer::Operation::LAUNCH:
+      case Offer::Operation::LAUNCH_GROUP: {
+        const RepeatedPtrField<TaskInfo>& tasks = [&]() {
+          if (operation.type() == Offer::Operation::LAUNCH) {
+            return operation.launch().task_infos();
+          } else if (operation.type() == Offer::Operation::LAUNCH_GROUP) {
+            return operation.launch_group().task_group().tasks();
+          }
+          UNREACHABLE();
+        }();
+
         // Authorize the tasks. A task is in 'framework->pendingTasks'
         // before it is authorized.
-        foreach (const TaskInfo& task, operation.launch().task_infos()) {
+        foreach (const TaskInfo& task, tasks) {
           futures.push_back(authorizeTask(task, framework));
 
           // Add to pending tasks.
@@ -3371,12 +3395,6 @@ void Master::accept(
         break;
       }
 
-      case Offer::Operation::LAUNCH_GROUP : {
-        // TODO(vinod): Implement this.
-        LOG(WARNING) << "Ignoring unimplemented LAUNCH_GROUP operation";
-        break;
-      }
-
       // NOTE: When handling RESERVE and UNRESERVE operations, authorization
       // will proceed even if no principal is specified, although currently
       // resources cannot be reserved or unreserved unless a principal is
@@ -3485,11 +3503,21 @@ void Master::_accept(
 
   if (slave == nullptr || !slave->connected) {
     foreach (const Offer::Operation& operation, accept.operations()) {
-      if (operation.type() != Offer::Operation::LAUNCH) {
+      if (operation.type() != Offer::Operation::LAUNCH &&
+          operation.type() != Offer::Operation::LAUNCH_GROUP) {
         continue;
       }
 
-      foreach (const TaskInfo& task, operation.launch().task_infos()) {
+      const RepeatedPtrField<TaskInfo>& tasks = [&]() {
+        if (operation.type() == Offer::Operation::LAUNCH) {
+          return operation.launch().task_infos();
+        } else {
+          CHECK_EQ(Offer::Operation::LAUNCH_GROUP, operation.type());
+          return operation.launch_group().task_group().tasks();
+        }
+      }();
+
+      foreach (const TaskInfo& task, tasks) {
         const TaskStatus::Reason reason =
             slave == nullptr ? TaskStatus::REASON_SLAVE_REMOVED
                           : TaskStatus::REASON_SLAVE_DISCONNECTED;
@@ -3885,9 +3913,177 @@ void Master::_accept(
         break;
       }
 
-      case Offer::Operation::LAUNCH_GROUP : {
-        // TODO(vinod): Implement this.
-        LOG(WARNING) << "Ignoring unimplemented LAUNCH_GROUP operation";
+      case Offer::Operation::LAUNCH_GROUP: {
+        // We must ensure that the entire group can be launched. This
+        // means all tasks in the group must be authorized and valid.
+        // If any tasks in the group have been killed in the interim
+        // we must kill the entire group.
+        const ExecutorInfo& executor = operation.launch_group().executor();
+        const TaskGroupInfo& taskGroup = operation.launch_group().task_group();
+
+        // Note that we do not fill in the `ExecutorInfo.framework_id`
+        // since we do not have to support backwards compatiblity like
+        // in the `Launch` operation case.
+
+        // TODO(bmahler): Consider injecting some default (cpus, mem, disk)
+        // resources when the framework omits the executor resources.
+
+        // See if there are any validation or authorization errors.
+        // Note that we'll only report the first error we encounter
+        // for the group.
+        Option<Error> error =
+          validation::task::group::validate(
+              taskGroup, executor, framework, slave, _offeredResources);
+
+        Option<TaskStatus::Reason> reason = None();
+
+        if (error.isSome()) {
+          reason = TaskStatus::REASON_TASK_GROUP_INVALID;
+        } else {
+          foreach (const TaskInfo& task, taskGroup.tasks()) {
+            Future<bool> authorization = authorizations.front();
+            authorizations.pop_front();
+
+            CHECK(!authorization.isDiscarded());
+
+            if (authorization.isFailed()) {
+              error = Error("Failed to authorize task"
+                            " '" + stringify(task.task_id()) + "'"
+                            ": " + authorization.failure());
+
+              reason = TaskStatus::REASON_TASK_GROUP_UNAUTHORIZED;
+
+              break;
+            }
+
+            if (!authorization.get()) {
+              string user = framework->info.user(); // Default user.
+              if (task.has_command() && task.command().has_user()) {
+                user = task.command().user();
+              }
+
+              error = Error("Task '" + stringify(task.task_id()) + "'"
+                            " is not authorized to launch as"
+                            " user '" + user + "'");
+
+              reason = TaskStatus::REASON_TASK_GROUP_UNAUTHORIZED;
+
+              break;
+            }
+          }
+        }
+
+        if (error.isSome()) {
+          CHECK_SOME(reason);
+
+          // NOTE: If some of these invalid or unauthorized tasks were
+          // killed already, here we end up sending a TASK_ERROR after
+          // having already sent TASK_KILLED.
+          foreach (const TaskInfo& task, taskGroup.tasks()) {
+            const StatusUpdate& update = protobuf::createStatusUpdate(
+                framework->id(),
+                task.slave_id(),
+                task.task_id(),
+                TASK_ERROR,
+                TaskStatus::SOURCE_MASTER,
+                None(),
+                error->message,
+                reason.get());
+
+            metrics->tasks_error++;
+
+            metrics->incrementTasksStates(
+                TASK_ERROR, TaskStatus::SOURCE_MASTER, reason.get());
+
+            forward(update, UPID(), framework);
+          }
+
+          break;
+        }
+
+        // Remove all the tasks from being pending. If any of the tasks
+        // have been killed in the interim, we will send TASK_KILLED
+        // for all other tasks in the group, since a TaskGroup must
+        // be delivered in its entirety.
+        hashset<TaskID> killed;
+        foreach (const TaskInfo& task, taskGroup.tasks()) {
+          bool pending = framework->pendingTasks.contains(task.task_id());
+          framework->pendingTasks.erase(task.task_id());
+
+          if (!pending) {
+            killed.insert(task.task_id());
+          }
+        }
+
+        // If task(s) were killed, send TASK_KILLED for
+        // all of the remaining tasks.
+        //
+        // TODO(bmahler): Do this killing when processing
+        // the `Kill` call, rather than doing it here.
+        if (!killed.empty()) {
+          foreach (const TaskInfo& task, taskGroup.tasks()) {
+            if (!killed.contains(task.task_id())) {
+              const StatusUpdate& update = protobuf::createStatusUpdate(
+                  framework->id(),
+                  task.slave_id(),
+                  task.task_id(),
+                  TASK_KILLED,
+                  TaskStatus::SOURCE_MASTER,
+                  None(),
+                  "A task within the task group was killed before"
+                  " delivery to the agent");
+
+              metrics->tasks_killed++;
+
+              // TODO(bmahler): Increment the task state source metric,
+              // we currently cannot because it requires each source
+              // requires a reason.
+
+              forward(update, UPID(), framework);
+            }
+          }
+
+          break;
+        }
+
+        // Now launch the task group!
+        RunTaskGroupMessage message;
+        message.mutable_framework()->CopyFrom(framework->info);
+        message.mutable_executor()->CopyFrom(executor);
+        message.mutable_task_group()->CopyFrom(taskGroup);
+
+        set<TaskID> taskIds;
+        Resources totalResources;
+
+        for (int i = 0; i < message.task_group().tasks().size(); ++i) {
+          TaskInfo* task = message.mutable_task_group()->mutable_tasks(i);
+
+          taskIds.insert(task->task_id());
+          totalResources += task->resources();
+
+          const Resources consumed = addTask(*task, framework, slave);
+
+          CHECK(_offeredResources.contains(consumed))
+            << _offeredResources << " does not contain " << consumed;
+
+          _offeredResources -= consumed;
+
+          if (HookManager::hooksAvailable()) {
+            // Set labels retrieved from label-decorator hooks.
+            task->mutable_labels()->CopyFrom(
+                HookManager::masterLaunchTaskLabelDecorator(
+                    *task,
+                    framework->info,
+                    slave->info));
+          }
+        }
+
+        LOG(INFO) << "Launching task group " << stringify(taskIds)
+                  << " of framework " << *framework
+                  << " with resources " << totalResources
+                  << " on agent " << *slave;
+
+        send(slave->pid, message);
         break;
       }
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/bf3957f4/src/messages/messages.proto
----------------------------------------------------------------------
diff --git a/src/messages/messages.proto b/src/messages/messages.proto
index 7b5e24f..17bb352 100644
--- a/src/messages/messages.proto
+++ b/src/messages/messages.proto
@@ -301,6 +301,21 @@ message RunTaskMessage {
 
 
 /**
+ * This message either notifies an existing executor to run a task
+ * group, or starts a new executor and runs the task group. This
+ * message is sent when scheduler::Call::Accept is sent with
+ * Offer::Operation::LaunchGroup.
+ *
+ * See executor::Event::LaunchGroup.
+ */
+message RunTaskGroupMessage {
+  required FrameworkInfo framework = 1;
+  required ExecutorInfo executor = 2;
+  required TaskGroupInfo task_group = 3;
+}
+
+
+/**
  * Kills a specific task.
  *
  * See scheduler::Call::Kill and executor::Event::Kill.

Reply via email to