Updated mesos-execute to support task groups.

Through this patch, mesos-execute supports running
task groups. Single task provided through command
line option is supported as well for backward
compatibility.

Review: https://reviews.apache.org/r/51623/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/f162ade0
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/f162ade0
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/f162ade0

Branch: refs/heads/master
Commit: f162ade0f9d4c93129f935c54d100cc173579361
Parents: e84c2c4
Author: Abhishek Dasgupta <a10gu...@linux.vnet.ibm.com>
Authored: Mon Sep 19 13:31:43 2016 -0700
Committer: Vinod Kone <vinodk...@gmail.com>
Committed: Mon Sep 19 13:31:43 2016 -0700

----------------------------------------------------------------------
 src/cli/execute.cpp | 299 ++++++++++++++++++++++++++++++++++-------------
 1 file changed, 221 insertions(+), 78 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/f162ade0/src/cli/execute.cpp
----------------------------------------------------------------------
diff --git a/src/cli/execute.cpp b/src/cli/execute.cpp
index f2e28e4..f180672 100644
--- a/src/cli/execute.cpp
+++ b/src/cli/execute.cpp
@@ -62,6 +62,7 @@ using mesos::v1::CommandInfo;
 using mesos::v1::ContainerInfo;
 using mesos::v1::Credential;
 using mesos::v1::Environment;
+using mesos::v1::ExecutorInfo;
 using mesos::v1::FrameworkID;
 using mesos::v1::FrameworkInfo;
 using mesos::v1::Image;
@@ -69,6 +70,7 @@ using mesos::v1::Label;
 using mesos::v1::Labels;
 using mesos::v1::Offer;
 using mesos::v1::Resources;
+using mesos::v1::TaskGroupInfo;
 using mesos::v1::TaskID;
 using mesos::v1::TaskInfo;
 using mesos::v1::TaskState;
@@ -93,6 +95,46 @@ public:
         "master",
         "Mesos master (e.g., IP:PORT).");
 
+    add(&task_group,
+        "task_group",
+        "The value could be a JSON-formatted string of `TaskGroupInfo` or a\n"
+        "file path containing the JSON-formatted `TaskGroupInfo`. Path must\n"
+        "be of the form `file:///path/to/file` or `/path/to/file`."
+        "\n"
+        "See the `TaskGroupInfo` message in `mesos.proto` for the expected\n"
+        "format. NOTE: `agent_id` need not to be set.\n"
+        "\n"
+        "Example:\n"
+        "{\n"
+        "  \"tasks\":\n"
+        "     [\n"
+        "        {\n"
+        "         \"name\": \"Name of the task\",\n"
+        "         \"task_id\": {\"value\" : \"Id of the task\"},\n"
+        "         \"agent_id\": {\"value\" : \"\"},\n"
+        "         \"resources\": [{\n"
+        "            \"name\": \"cpus\",\n"
+        "            \"type\": \"SCALAR\",\n"
+        "            \"scalar\": {\n"
+        "                \"value\": 0.1\n"
+        "             },\n"
+        "            \"role\": \"*\"\n"
+        "           },\n"
+        "           {\n"
+        "            \"name\": \"mem\",\n"
+        "            \"type\": \"SCALAR\",\n"
+        "            \"scalar\": {\n"
+        "                \"value\": 32\n"
+        "             },\n"
+        "            \"role\": \"*\"\n"
+        "          }],\n"
+        "         \"command\": {\n"
+        "            \"value\": \"sleep 1000\"\n"
+        "           }\n"
+        "       }\n"
+        "     ]\n"
+        "}");
+
     add(&name,
         "name",
         "Name for the command.");
@@ -225,6 +267,7 @@ public:
 
   Option<string> master;
   Option<string> name;
+  Option<TaskGroupInfo> task_group;
   bool shell;
   Option<string> command;
   Option<hashmap<string, string>> environment;
@@ -255,14 +298,17 @@ public:
       const string& _master,
       const Option<Duration>& _killAfter,
       const Option<Credential>& _credential,
-      const TaskInfo& _task)
+      const Option<TaskInfo>& _task,
+      const Option<TaskGroupInfo> _taskGroup)
     : state(DISCONNECTED),
       frameworkInfo(_frameworkInfo),
       master(_master),
       killAfter(_killAfter),
       credential(_credential),
       task(_task),
-      launched(false) {}
+      taskGroup(_taskGroup),
+      launched(false),
+      terminatedTaskCount(0) {}
 
   virtual ~CommandScheduler() {}
 
@@ -338,43 +384,106 @@ protected:
     foreach (const Offer& offer, offers) {
       Resources offered = offer.resources();
 
-      TaskInfo _task = task;
+      Resources requiredResources;
 
-      if (!launched && offered.flatten().contains(_task.resources())) {
-        _task.mutable_agent_id()->MergeFrom(offer.agent_id());
+      CHECK_NE(task.isSome(), taskGroup.isSome())
+        << "Either task or task group should be set but not both";
 
-        // Takes resources first from the specified role, then from '*'.
-        Try<Resources> flattened =
-          Resources(_task.resources()).flatten(frameworkInfo.role());
+      if (task.isSome()) {
+        requiredResources = Resources(task.get().resources());
+      } else {
+        foreach (const TaskInfo& _task, taskGroup->tasks()) {
+          requiredResources += Resources(_task.resources());
+        }
+      }
 
-        // `frameworkInfo.role()` must be valid as it's allowed to register.
-        CHECK_SOME(flattened);
-        Option<Resources> resources = offered.find(flattened.get());
+      if (!launched && offered.flatten().contains(requiredResources)) {
+        TaskInfo _task;
+        TaskGroupInfo _taskGroup;
 
-        CHECK_SOME(resources);
+        if (task.isSome()) {
+          _task = task.get();
+          _task.mutable_agent_id()->MergeFrom(offer.agent_id());
 
-        _task.mutable_resources()->CopyFrom(resources.get());
+          // Takes resources first from the specified role, then from '*'.
+          Try<Resources> flattened =
+            Resources(_task.resources()).flatten(frameworkInfo.role());
 
-        Call call;
-        call.set_type(Call::ACCEPT);
+          // `frameworkInfo.role()` must be valid as it's allowed to register.
+          CHECK_SOME(flattened);
+          Option<Resources> resources = offered.find(flattened.get());
 
-        CHECK(frameworkInfo.has_id());
-        call.mutable_framework_id()->CopyFrom(frameworkInfo.id());
+          CHECK_SOME(resources);
 
-        Call::Accept* accept = call.mutable_accept();
-        accept->add_offer_ids()->CopyFrom(offer.id());
+          _task.mutable_resources()->CopyFrom(resources.get());
+        } else {
+          foreach (TaskInfo _task, taskGroup->tasks()) {
+              _task.mutable_agent_id()->MergeFrom(offer.agent_id());
 
-        Offer::Operation* operation = accept->add_operations();
-        operation->set_type(Offer::Operation::LAUNCH);
+              // Takes resources first from the specified role, then from '*'.
+              Try<Resources> flattened =
+                Resources(_task.resources()).flatten(frameworkInfo.role());
 
-        operation->mutable_launch()->add_task_infos()->CopyFrom(_task);
+              // `frameworkInfo.role()` must be valid as it's allowed to
+              // register.
+              CHECK_SOME(flattened);
+              Option<Resources> resources = offered.find(flattened.get());
 
-        mesos->send(call);
+              CHECK_SOME(resources);
+
+              _task.mutable_resources()->CopyFrom(resources.get());
+              _taskGroup.add_tasks()->CopyFrom(_task);
+          }
+       }
+       Call call;
+       call.set_type(Call::ACCEPT);
 
-        cout << "Submitted task '" << _task.name() << "' to agent '"
-             << offer.agent_id() << "'" << endl;
+       CHECK(frameworkInfo.has_id());
+       call.mutable_framework_id()->CopyFrom(frameworkInfo.id());
 
-        launched = true;
+       Call::Accept* accept = call.mutable_accept();
+       accept->add_offer_ids()->CopyFrom(offer.id());
+
+       Offer::Operation* operation = accept->add_operations();
+
+       if (task.isSome()) {
+         operation->set_type(Offer::Operation::LAUNCH);
+         operation->mutable_launch()->add_task_infos()->CopyFrom(_task);
+       } else {
+         operation->set_type(Offer::Operation::LAUNCH_GROUP);
+
+         ExecutorInfo* executorInfo =
+           operation->mutable_launch_group()->mutable_executor();
+
+         executorInfo->set_type(ExecutorInfo::DEFAULT);
+         executorInfo->mutable_executor_id()->set_value(
+             "default-executor");
+
+         executorInfo->mutable_framework_id()->CopyFrom(frameworkInfo.id());
+         executorInfo->mutable_resources()->CopyFrom(
+             Resources::parse("cpus:0.1;mem:32;disk:32").get());
+
+         operation->mutable_launch_group()->mutable_task_group()->CopyFrom(
+             _taskGroup);
+       }
+
+       mesos->send(call);
+
+       if (task.isSome()) {
+         cout << "Submitted task '" << task.get().name() << "' to agent '"
+              << offer.agent_id() << "'" << endl;
+       } else {
+         vector<TaskID> taskIds;
+
+         foreach (const TaskInfo& _task, taskGroup->tasks()) {
+           taskIds.push_back(_task.task_id());
+         }
+
+         cout << "Submitted task group with tasks "<< taskIds
+              << " to agent '" << offer.agent_id() << "'" << endl;
+       }
+
+       launched = true;
       } else {
         Call call;
         call.set_type(Call::DECLINE);
@@ -445,7 +554,8 @@ protected:
   {
     CHECK_EQ(SUBSCRIBED, state);
 
-    CHECK_EQ(task.name(), status.task_id().value());
+    CHECK_NE(task.isSome(), taskGroup.isSome())
+      << "Either task or task group should be set but not both";
 
     cout << "Received status update " << status.state()
          << " for task '" << status.task_id() << "'" << endl;
@@ -487,7 +597,15 @@ protected:
     }
 
     if (mesos::internal::protobuf::isTerminalState(devolve(status).state())) {
-      terminate(self());
+      if (task.isSome()) {
+        terminate(self());
+      } else {
+        terminatedTaskCount++;
+
+        if (terminatedTaskCount == taskGroup->tasks().size()) {
+          terminate(self());
+        }
+      }
     }
   }
 
@@ -503,8 +621,10 @@ private:
   const string master;
   const Option<Duration> killAfter;
   const Option<Credential> credential;
-  const TaskInfo task;
+  const Option<TaskInfo> task;
+  const Option<TaskGroupInfo> taskGroup;
   bool launched;
+  int terminatedTaskCount;
   Owned<Mesos> mesos;
 };
 
@@ -643,14 +763,30 @@ int main(int argc, char** argv)
     return EXIT_FAILURE;
   }
 
-  if (flags.name.isNone()) {
-    cerr << flags.usage("Missing required option --name") << endl;
+  if (flags.task_group.isSome() &&
+      (flags.name.isSome() ||
+       flags.command.isSome() ||
+       flags.environment.isSome() ||
+       flags.appc_image.isSome()  ||
+       flags.docker_image.isSome() ||
+       flags.volumes.isSome())) {
+    cerr << flags.usage(
+              "Either task or task group should be set but not both. Provide"
+              " either '--name, --command, --env, --appc_image, 
--docker_image,"
+              " --volumes' OR '--task_group'") << endl;
     return EXIT_FAILURE;
   }
 
-  if (flags.shell && flags.command.isNone()) {
-    cerr << flags.usage("Missing required option --command") << endl;
-    return EXIT_FAILURE;
+  if (flags.task_group.isNone()) {
+    if (flags.name.isNone()) {
+      cerr << flags.usage("Missing required option --name") << endl;
+      return EXIT_FAILURE;
+    }
+
+    if (flags.shell && flags.command.isNone()) {
+      cerr << flags.usage("Missing required option --command") << endl;
+      return EXIT_FAILURE;
+    }
   }
 
   Result<string> user = os::user();
@@ -809,62 +945,68 @@ int main(int argc, char** argv)
     }
   }
 
-  TaskInfo task;
-  task.set_name(flags.name.get());
-  task.mutable_task_id()->set_value(flags.name.get());
+  Option<TaskInfo> taskInfo = None();
 
-  static const Try<Resources> resources = Resources::parse(flags.resources);
+  if (flags.task_group.isNone()) {
+    TaskInfo task;
+    task.set_name(flags.name.get());
+    task.mutable_task_id()->set_value(flags.name.get());
 
-  if (resources.isError()) {
-    EXIT(EXIT_FAILURE)
-      << "Failed to parse resources '" << flags.resources << "': "
-      << resources.error();
-  }
+    static const Try<Resources> resources = Resources::parse(flags.resources);
 
-  task.mutable_resources()->CopyFrom(resources.get());
+    if (resources.isError()) {
+      EXIT(EXIT_FAILURE)
+        << "Failed to parse resources '" << flags.resources << "': "
+        << resources.error();
+    }
 
-  CommandInfo* commandInfo = task.mutable_command();
+    task.mutable_resources()->CopyFrom(resources.get());
 
-  if (flags.shell) {
-    CHECK_SOME(flags.command);
+    CommandInfo* commandInfo = task.mutable_command();
 
-    commandInfo->set_shell(true);
-    commandInfo->set_value(flags.command.get());
-  } else {
-    // TODO(gilbert): Treat 'command' as executable value and arguments.
-    commandInfo->set_shell(false);
-  }
+    if (flags.shell) {
+      CHECK_SOME(flags.command);
 
-  if (flags.environment.isSome()) {
-    Environment* environment_ = commandInfo->mutable_environment();
-    foreachpair (
-        const string& name, const string& value, environment.get()) {
-      Environment::Variable* environmentVariable =
-        environment_->add_variables();
+      commandInfo->set_shell(true);
+      commandInfo->set_value(flags.command.get());
+    } else {
+      // TODO(gilbert): Treat 'command' as executable value and arguments.
+      commandInfo->set_shell(false);
+    }
 
-      environmentVariable->set_name(name);
-      environmentVariable->set_value(value);
+    if (flags.environment.isSome()) {
+      Environment* environment_ = commandInfo->mutable_environment();
+      foreachpair (
+          const string& name, const string& value, environment.get()) {
+        Environment::Variable* environmentVariable =
+          environment_->add_variables();
+
+        environmentVariable->set_name(name);
+        environmentVariable->set_value(value);
+      }
     }
-  }
 
-  if (uri.isSome()) {
-    task.mutable_command()->add_uris()->set_value(uri.get());
-  }
+    if (uri.isSome()) {
+      task.mutable_command()->add_uris()->set_value(uri.get());
+    }
 
-  Result<ContainerInfo> containerInfo =
-    getContainerInfo(
-      flags.containerizer,
-      volumes,
-      flags.networks,
-      appcImage,
-      dockerImage);
+    Result<ContainerInfo> containerInfo =
+      getContainerInfo(
+        flags.containerizer,
+        volumes,
+        flags.networks,
+        appcImage,
+        dockerImage);
 
-  if (containerInfo.isError()){
-    EXIT(EXIT_FAILURE) << containerInfo.error();
-  }
+    if (containerInfo.isError()){
+      EXIT(EXIT_FAILURE) << containerInfo.error();
+    }
+
+    if (containerInfo.isSome()) {
+      task.mutable_container()->CopyFrom(containerInfo.get());
+    }
 
-  if (containerInfo.isSome()) {
-    task.mutable_container()->CopyFrom(containerInfo.get());
+    taskInfo = task;
   }
 
   Owned<CommandScheduler> scheduler(
@@ -873,7 +1015,8 @@ int main(int argc, char** argv)
         flags.master.get(),
         flags.kill_after,
         credential,
-        task));
+        taskInfo,
+        flags.task_group));
 
   process::spawn(scheduler.get());
   process::wait(scheduler.get());

Reply via email to