Updated mesos-execute to support task groups. Through this patch, mesos-execute supports running task groups. Single task provided through command line option is supported as well for backward compatibility.
Review: https://reviews.apache.org/r/51623/ Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/f162ade0 Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/f162ade0 Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/f162ade0 Branch: refs/heads/master Commit: f162ade0f9d4c93129f935c54d100cc173579361 Parents: e84c2c4 Author: Abhishek Dasgupta <a10gu...@linux.vnet.ibm.com> Authored: Mon Sep 19 13:31:43 2016 -0700 Committer: Vinod Kone <vinodk...@gmail.com> Committed: Mon Sep 19 13:31:43 2016 -0700 ---------------------------------------------------------------------- src/cli/execute.cpp | 299 ++++++++++++++++++++++++++++++++++------------- 1 file changed, 221 insertions(+), 78 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/f162ade0/src/cli/execute.cpp ---------------------------------------------------------------------- diff --git a/src/cli/execute.cpp b/src/cli/execute.cpp index f2e28e4..f180672 100644 --- a/src/cli/execute.cpp +++ b/src/cli/execute.cpp @@ -62,6 +62,7 @@ using mesos::v1::CommandInfo; using mesos::v1::ContainerInfo; using mesos::v1::Credential; using mesos::v1::Environment; +using mesos::v1::ExecutorInfo; using mesos::v1::FrameworkID; using mesos::v1::FrameworkInfo; using mesos::v1::Image; @@ -69,6 +70,7 @@ using mesos::v1::Label; using mesos::v1::Labels; using mesos::v1::Offer; using mesos::v1::Resources; +using mesos::v1::TaskGroupInfo; using mesos::v1::TaskID; using mesos::v1::TaskInfo; using mesos::v1::TaskState; @@ -93,6 +95,46 @@ public: "master", "Mesos master (e.g., IP:PORT)."); + add(&task_group, + "task_group", + "The value could be a JSON-formatted string of `TaskGroupInfo` or a\n" + "file path containing the JSON-formatted `TaskGroupInfo`. Path must\n" + "be of the form `file:///path/to/file` or `/path/to/file`." + "\n" + "See the `TaskGroupInfo` message in `mesos.proto` for the expected\n" + "format. NOTE: `agent_id` need not to be set.\n" + "\n" + "Example:\n" + "{\n" + " \"tasks\":\n" + " [\n" + " {\n" + " \"name\": \"Name of the task\",\n" + " \"task_id\": {\"value\" : \"Id of the task\"},\n" + " \"agent_id\": {\"value\" : \"\"},\n" + " \"resources\": [{\n" + " \"name\": \"cpus\",\n" + " \"type\": \"SCALAR\",\n" + " \"scalar\": {\n" + " \"value\": 0.1\n" + " },\n" + " \"role\": \"*\"\n" + " },\n" + " {\n" + " \"name\": \"mem\",\n" + " \"type\": \"SCALAR\",\n" + " \"scalar\": {\n" + " \"value\": 32\n" + " },\n" + " \"role\": \"*\"\n" + " }],\n" + " \"command\": {\n" + " \"value\": \"sleep 1000\"\n" + " }\n" + " }\n" + " ]\n" + "}"); + add(&name, "name", "Name for the command."); @@ -225,6 +267,7 @@ public: Option<string> master; Option<string> name; + Option<TaskGroupInfo> task_group; bool shell; Option<string> command; Option<hashmap<string, string>> environment; @@ -255,14 +298,17 @@ public: const string& _master, const Option<Duration>& _killAfter, const Option<Credential>& _credential, - const TaskInfo& _task) + const Option<TaskInfo>& _task, + const Option<TaskGroupInfo> _taskGroup) : state(DISCONNECTED), frameworkInfo(_frameworkInfo), master(_master), killAfter(_killAfter), credential(_credential), task(_task), - launched(false) {} + taskGroup(_taskGroup), + launched(false), + terminatedTaskCount(0) {} virtual ~CommandScheduler() {} @@ -338,43 +384,106 @@ protected: foreach (const Offer& offer, offers) { Resources offered = offer.resources(); - TaskInfo _task = task; + Resources requiredResources; - if (!launched && offered.flatten().contains(_task.resources())) { - _task.mutable_agent_id()->MergeFrom(offer.agent_id()); + CHECK_NE(task.isSome(), taskGroup.isSome()) + << "Either task or task group should be set but not both"; - // Takes resources first from the specified role, then from '*'. - Try<Resources> flattened = - Resources(_task.resources()).flatten(frameworkInfo.role()); + if (task.isSome()) { + requiredResources = Resources(task.get().resources()); + } else { + foreach (const TaskInfo& _task, taskGroup->tasks()) { + requiredResources += Resources(_task.resources()); + } + } - // `frameworkInfo.role()` must be valid as it's allowed to register. - CHECK_SOME(flattened); - Option<Resources> resources = offered.find(flattened.get()); + if (!launched && offered.flatten().contains(requiredResources)) { + TaskInfo _task; + TaskGroupInfo _taskGroup; - CHECK_SOME(resources); + if (task.isSome()) { + _task = task.get(); + _task.mutable_agent_id()->MergeFrom(offer.agent_id()); - _task.mutable_resources()->CopyFrom(resources.get()); + // Takes resources first from the specified role, then from '*'. + Try<Resources> flattened = + Resources(_task.resources()).flatten(frameworkInfo.role()); - Call call; - call.set_type(Call::ACCEPT); + // `frameworkInfo.role()` must be valid as it's allowed to register. + CHECK_SOME(flattened); + Option<Resources> resources = offered.find(flattened.get()); - CHECK(frameworkInfo.has_id()); - call.mutable_framework_id()->CopyFrom(frameworkInfo.id()); + CHECK_SOME(resources); - Call::Accept* accept = call.mutable_accept(); - accept->add_offer_ids()->CopyFrom(offer.id()); + _task.mutable_resources()->CopyFrom(resources.get()); + } else { + foreach (TaskInfo _task, taskGroup->tasks()) { + _task.mutable_agent_id()->MergeFrom(offer.agent_id()); - Offer::Operation* operation = accept->add_operations(); - operation->set_type(Offer::Operation::LAUNCH); + // Takes resources first from the specified role, then from '*'. + Try<Resources> flattened = + Resources(_task.resources()).flatten(frameworkInfo.role()); - operation->mutable_launch()->add_task_infos()->CopyFrom(_task); + // `frameworkInfo.role()` must be valid as it's allowed to + // register. + CHECK_SOME(flattened); + Option<Resources> resources = offered.find(flattened.get()); - mesos->send(call); + CHECK_SOME(resources); + + _task.mutable_resources()->CopyFrom(resources.get()); + _taskGroup.add_tasks()->CopyFrom(_task); + } + } + Call call; + call.set_type(Call::ACCEPT); - cout << "Submitted task '" << _task.name() << "' to agent '" - << offer.agent_id() << "'" << endl; + CHECK(frameworkInfo.has_id()); + call.mutable_framework_id()->CopyFrom(frameworkInfo.id()); - launched = true; + Call::Accept* accept = call.mutable_accept(); + accept->add_offer_ids()->CopyFrom(offer.id()); + + Offer::Operation* operation = accept->add_operations(); + + if (task.isSome()) { + operation->set_type(Offer::Operation::LAUNCH); + operation->mutable_launch()->add_task_infos()->CopyFrom(_task); + } else { + operation->set_type(Offer::Operation::LAUNCH_GROUP); + + ExecutorInfo* executorInfo = + operation->mutable_launch_group()->mutable_executor(); + + executorInfo->set_type(ExecutorInfo::DEFAULT); + executorInfo->mutable_executor_id()->set_value( + "default-executor"); + + executorInfo->mutable_framework_id()->CopyFrom(frameworkInfo.id()); + executorInfo->mutable_resources()->CopyFrom( + Resources::parse("cpus:0.1;mem:32;disk:32").get()); + + operation->mutable_launch_group()->mutable_task_group()->CopyFrom( + _taskGroup); + } + + mesos->send(call); + + if (task.isSome()) { + cout << "Submitted task '" << task.get().name() << "' to agent '" + << offer.agent_id() << "'" << endl; + } else { + vector<TaskID> taskIds; + + foreach (const TaskInfo& _task, taskGroup->tasks()) { + taskIds.push_back(_task.task_id()); + } + + cout << "Submitted task group with tasks "<< taskIds + << " to agent '" << offer.agent_id() << "'" << endl; + } + + launched = true; } else { Call call; call.set_type(Call::DECLINE); @@ -445,7 +554,8 @@ protected: { CHECK_EQ(SUBSCRIBED, state); - CHECK_EQ(task.name(), status.task_id().value()); + CHECK_NE(task.isSome(), taskGroup.isSome()) + << "Either task or task group should be set but not both"; cout << "Received status update " << status.state() << " for task '" << status.task_id() << "'" << endl; @@ -487,7 +597,15 @@ protected: } if (mesos::internal::protobuf::isTerminalState(devolve(status).state())) { - terminate(self()); + if (task.isSome()) { + terminate(self()); + } else { + terminatedTaskCount++; + + if (terminatedTaskCount == taskGroup->tasks().size()) { + terminate(self()); + } + } } } @@ -503,8 +621,10 @@ private: const string master; const Option<Duration> killAfter; const Option<Credential> credential; - const TaskInfo task; + const Option<TaskInfo> task; + const Option<TaskGroupInfo> taskGroup; bool launched; + int terminatedTaskCount; Owned<Mesos> mesos; }; @@ -643,14 +763,30 @@ int main(int argc, char** argv) return EXIT_FAILURE; } - if (flags.name.isNone()) { - cerr << flags.usage("Missing required option --name") << endl; + if (flags.task_group.isSome() && + (flags.name.isSome() || + flags.command.isSome() || + flags.environment.isSome() || + flags.appc_image.isSome() || + flags.docker_image.isSome() || + flags.volumes.isSome())) { + cerr << flags.usage( + "Either task or task group should be set but not both. Provide" + " either '--name, --command, --env, --appc_image, --docker_image," + " --volumes' OR '--task_group'") << endl; return EXIT_FAILURE; } - if (flags.shell && flags.command.isNone()) { - cerr << flags.usage("Missing required option --command") << endl; - return EXIT_FAILURE; + if (flags.task_group.isNone()) { + if (flags.name.isNone()) { + cerr << flags.usage("Missing required option --name") << endl; + return EXIT_FAILURE; + } + + if (flags.shell && flags.command.isNone()) { + cerr << flags.usage("Missing required option --command") << endl; + return EXIT_FAILURE; + } } Result<string> user = os::user(); @@ -809,62 +945,68 @@ int main(int argc, char** argv) } } - TaskInfo task; - task.set_name(flags.name.get()); - task.mutable_task_id()->set_value(flags.name.get()); + Option<TaskInfo> taskInfo = None(); - static const Try<Resources> resources = Resources::parse(flags.resources); + if (flags.task_group.isNone()) { + TaskInfo task; + task.set_name(flags.name.get()); + task.mutable_task_id()->set_value(flags.name.get()); - if (resources.isError()) { - EXIT(EXIT_FAILURE) - << "Failed to parse resources '" << flags.resources << "': " - << resources.error(); - } + static const Try<Resources> resources = Resources::parse(flags.resources); - task.mutable_resources()->CopyFrom(resources.get()); + if (resources.isError()) { + EXIT(EXIT_FAILURE) + << "Failed to parse resources '" << flags.resources << "': " + << resources.error(); + } - CommandInfo* commandInfo = task.mutable_command(); + task.mutable_resources()->CopyFrom(resources.get()); - if (flags.shell) { - CHECK_SOME(flags.command); + CommandInfo* commandInfo = task.mutable_command(); - commandInfo->set_shell(true); - commandInfo->set_value(flags.command.get()); - } else { - // TODO(gilbert): Treat 'command' as executable value and arguments. - commandInfo->set_shell(false); - } + if (flags.shell) { + CHECK_SOME(flags.command); - if (flags.environment.isSome()) { - Environment* environment_ = commandInfo->mutable_environment(); - foreachpair ( - const string& name, const string& value, environment.get()) { - Environment::Variable* environmentVariable = - environment_->add_variables(); + commandInfo->set_shell(true); + commandInfo->set_value(flags.command.get()); + } else { + // TODO(gilbert): Treat 'command' as executable value and arguments. + commandInfo->set_shell(false); + } - environmentVariable->set_name(name); - environmentVariable->set_value(value); + if (flags.environment.isSome()) { + Environment* environment_ = commandInfo->mutable_environment(); + foreachpair ( + const string& name, const string& value, environment.get()) { + Environment::Variable* environmentVariable = + environment_->add_variables(); + + environmentVariable->set_name(name); + environmentVariable->set_value(value); + } } - } - if (uri.isSome()) { - task.mutable_command()->add_uris()->set_value(uri.get()); - } + if (uri.isSome()) { + task.mutable_command()->add_uris()->set_value(uri.get()); + } - Result<ContainerInfo> containerInfo = - getContainerInfo( - flags.containerizer, - volumes, - flags.networks, - appcImage, - dockerImage); + Result<ContainerInfo> containerInfo = + getContainerInfo( + flags.containerizer, + volumes, + flags.networks, + appcImage, + dockerImage); - if (containerInfo.isError()){ - EXIT(EXIT_FAILURE) << containerInfo.error(); - } + if (containerInfo.isError()){ + EXIT(EXIT_FAILURE) << containerInfo.error(); + } + + if (containerInfo.isSome()) { + task.mutable_container()->CopyFrom(containerInfo.get()); + } - if (containerInfo.isSome()) { - task.mutable_container()->CopyFrom(containerInfo.get()); + taskInfo = task; } Owned<CommandScheduler> scheduler( @@ -873,7 +1015,8 @@ int main(int argc, char** argv) flags.master.get(), flags.kill_after, credential, - task)); + taskInfo, + flags.task_group)); process::spawn(scheduler.get()); process::wait(scheduler.get());