Implemented 'GetTasks' call in v1 agent API. Review: https://reviews.apache.org/r/49759/
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/d8e4b05b Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/d8e4b05b Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/d8e4b05b Branch: refs/heads/master Commit: d8e4b05bce33e9a1a459d328fc7f97c73fe4eeee Parents: 950e9ce Author: haosdent huang <haosd...@gmail.com> Authored: Wed Jul 13 12:10:16 2016 -0700 Committer: Vinod Kone <vinodk...@gmail.com> Committed: Wed Jul 13 12:10:16 2016 -0700 ---------------------------------------------------------------------- include/mesos/agent/agent.proto | 24 +++++ include/mesos/v1/agent/agent.proto | 24 +++++ src/slave/http.cpp | 184 ++++++++++++++++++++++++++++++++ src/slave/slave.hpp | 10 ++ src/slave/validation.cpp | 3 + 5 files changed, 245 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/d8e4b05b/include/mesos/agent/agent.proto ---------------------------------------------------------------------- diff --git a/include/mesos/agent/agent.proto b/include/mesos/agent/agent.proto index 2d39a63..528b8b3 100644 --- a/include/mesos/agent/agent.proto +++ b/include/mesos/agent/agent.proto @@ -52,6 +52,7 @@ message Call { GET_CONTAINERS = 10; GET_FRAMEWORKS = 11; // Retrieves the information about known frameworks. GET_EXECUTORS = 12; // Retrieves the information about known executors. + GET_TASKS = 13; // Retrieves the information about known tasks. } // Provides a snapshot of the current metrics tracked by the agent. @@ -124,6 +125,7 @@ message Response { GET_CONTAINERS = 9; GET_FRAMEWORKS = 10; // See 'GetFrameworks' below. GET_EXECUTORS = 11; // See 'GetExecutors' below. + GET_TASKS = 12; // See 'GetTasks' below. } // `healthy` would be true if the agent is healthy. Delayed responses are also @@ -207,6 +209,27 @@ message Response { repeated Executor completed_executors = 2; } + // Lists information about all the tasks known to the agent at the current + // time. + message GetTasks { + // Tasks that are pending in the agent's queue before an executor is + // launched. + repeated Task pending_tasks = 1; + + // Tasks that are enqueued for a launched executor that has not yet + // registered. + repeated Task queued_tasks = 2; + + // Tasks that are running. + repeated Task launched_tasks = 3; + + // Tasks that are terminated but pending updates. + repeated Task terminated_tasks = 4; + + // Tasks that are terminated and updates acked. + repeated Task completed_tasks = 5; + } + optional Type type = 1; optional GetHealth get_health = 2; @@ -220,4 +243,5 @@ message Response { optional GetContainers get_containers = 10; optional GetFrameworks get_frameworks = 11; optional GetExecutors get_executors = 12; + optional GetTasks get_tasks = 13; } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/mesos/blob/d8e4b05b/include/mesos/v1/agent/agent.proto ---------------------------------------------------------------------- diff --git a/include/mesos/v1/agent/agent.proto b/include/mesos/v1/agent/agent.proto index 052f942..699a17b 100644 --- a/include/mesos/v1/agent/agent.proto +++ b/include/mesos/v1/agent/agent.proto @@ -52,6 +52,7 @@ message Call { GET_CONTAINERS = 10; GET_FRAMEWORKS = 11; // Retrieves the information about known frameworks. GET_EXECUTORS = 12; // Retrieves the information about known executors. + GET_TASKS = 13; // Retrieves the information about known tasks. } // Provides a snapshot of the current metrics tracked by the agent. @@ -124,6 +125,7 @@ message Response { GET_CONTAINERS = 9; GET_FRAMEWORKS = 10; // See 'GetFrameworks' below. GET_EXECUTORS = 11; // See 'GetExecutors' below. + GET_TASKS = 12; // See 'GetTasks' below. } // `healthy` would be true if the agent is healthy. Delayed responses are also @@ -207,6 +209,27 @@ message Response { repeated Executor completed_executors = 2; } + // Lists information about all the tasks known to the agent at the current + // time. + message GetTasks { + // Tasks that are pending in the agent's queue before an executor is + // launched. + repeated Task pending_tasks = 1; + + // Tasks that are enqueued for a launched executor that has not yet + // registered. + repeated Task queued_tasks = 2; + + // Tasks that are running. + repeated Task launched_tasks = 3; + + // Tasks that are terminated but pending updates. + repeated Task terminated_tasks = 4; + + // Tasks that are terminated and updates acked. + repeated Task completed_tasks = 5; + } + optional Type type = 1; optional GetHealth get_health = 2; @@ -220,4 +243,5 @@ message Response { optional GetContainers get_containers = 10; optional GetFrameworks get_frameworks = 11; optional GetExecutors get_executors = 12; + optional GetTasks get_tasks = 13; } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/mesos/blob/d8e4b05b/src/slave/http.cpp ---------------------------------------------------------------------- diff --git a/src/slave/http.cpp b/src/slave/http.cpp index a242e0b..63968bf 100644 --- a/src/slave/http.cpp +++ b/src/slave/http.cpp @@ -395,6 +395,9 @@ Future<Response> Slave::Http::api( case agent::Call::GET_EXECUTORS: return getExecutors(call, principal, acceptType); + + case agent::Call::GET_TASKS: + return getTasks(call, principal, acceptType); } UNREACHABLE(); @@ -1227,6 +1230,187 @@ agent::Response::GetExecutors Slave::Http::_getExecutors( } +Future<Response> Slave::Http::getTasks( + const agent::Call& call, + const Option<string>& principal, + ContentType contentType) const +{ + CHECK_EQ(agent::Call::GET_TASKS, call.type()); + + // Retrieve Approvers for authorizing frameworks and tasks. + Future<Owned<ObjectApprover>> frameworksApprover; + Future<Owned<ObjectApprover>> tasksApprover; + Future<Owned<ObjectApprover>> executorsApprover; + if (slave->authorizer.isSome()) { + authorization::Subject subject; + if (principal.isSome()) { + subject.set_value(principal.get()); + } + + frameworksApprover = slave->authorizer.get()->getObjectApprover( + subject, authorization::VIEW_FRAMEWORK); + + tasksApprover = slave->authorizer.get()->getObjectApprover( + subject, authorization::VIEW_TASK); + + executorsApprover = slave->authorizer.get()->getObjectApprover( + subject, authorization::VIEW_EXECUTOR); + } else { + frameworksApprover = Owned<ObjectApprover>(new AcceptingObjectApprover()); + tasksApprover = Owned<ObjectApprover>(new AcceptingObjectApprover()); + executorsApprover = Owned<ObjectApprover>(new AcceptingObjectApprover()); + } + + return collect(frameworksApprover, tasksApprover, executorsApprover) + .then(defer(slave->self(), + [this, contentType](const tuple<Owned<ObjectApprover>, + Owned<ObjectApprover>, + Owned<ObjectApprover>>& approvers) + -> Future<Response> { + // Get approver from tuple. + Owned<ObjectApprover> frameworksApprover; + Owned<ObjectApprover> tasksApprover; + Owned<ObjectApprover> executorsApprover; + tie(frameworksApprover, tasksApprover, executorsApprover) = approvers; + + agent::Response response; + response.set_type(agent::Response::GET_TASKS); + + response.mutable_get_tasks()->CopyFrom( + _getTasks(frameworksApprover, + tasksApprover, + executorsApprover)); + + return OK(serialize(contentType, evolve(response)), + stringify(contentType)); + })); +} + + +agent::Response::GetTasks Slave::Http::_getTasks( + const Owned<ObjectApprover>& frameworksApprover, + const Owned<ObjectApprover>& tasksApprover, + const Owned<ObjectApprover>& executorsApprover) const +{ + // Construct framework list with both active and completed frameworks. + vector<const Framework*> frameworks; + foreachvalue (Framework* framework, slave->frameworks) { + // Skip unauthorized frameworks. + if (!approveViewFrameworkInfo(frameworksApprover, framework->info)) { + continue; + } + + frameworks.push_back(framework); + } + + foreach (const Owned<Framework>& framework, slave->completedFrameworks) { + // Skip unauthorized frameworks. + if (!approveViewFrameworkInfo(frameworksApprover, framework->info)) { + continue; + } + + frameworks.push_back(framework.get()); + } + + // Construct executor list with both active and completed executors. + hashmap<const Executor*, const Framework*> executors; + foreach (const Framework* framework, frameworks) { + foreachvalue (Executor* executor, framework->executors) { + // Skip unauthorized executors. + if (!approveViewExecutorInfo(executorsApprover, + executor->info, + framework->info)) { + continue; + } + + executors.put(executor, framework); + } + + foreach (const Owned<Executor>& executor, framework->completedExecutors) { + // Skip unauthorized executors. + if (!approveViewExecutorInfo(executorsApprover, + executor->info, + framework->info)) { + continue; + } + + executors.put(executor.get(), framework); + } + } + + agent::Response::GetTasks getTasks; + + foreach (const Framework* framework, frameworks) { + // Pending tasks. + typedef hashmap<TaskID, TaskInfo> TaskMap; + foreachvalue (const TaskMap& taskInfos, framework->pending) { + foreachvalue (const TaskInfo& taskInfo, taskInfos) { + // Skip unauthorized tasks. + if (!approveViewTaskInfo(tasksApprover, taskInfo, framework->info)) { + continue; + } + + const Task& task = + protobuf::createTask(taskInfo, TASK_STAGING, framework->id()); + + getTasks.add_pending_tasks()->CopyFrom(task); + } + } + } + + foreachpair (const Executor* executor, + const Framework* framework, + executors) { + // Queued tasks. + foreach (const TaskInfo& taskInfo, executor->queuedTasks.values()) { + // Skip unauthorized tasks. + if (!approveViewTaskInfo(tasksApprover, taskInfo, framework->info)) { + continue; + } + + const Task& task = + protobuf::createTask(taskInfo, TASK_STAGING, framework->id()); + + getTasks.add_queued_tasks()->CopyFrom(task); + } + + // Launched tasks. + foreach (Task* task, executor->launchedTasks.values()) { + CHECK_NOTNULL(task); + // Skip unauthorized tasks. + if (!approveViewTask(tasksApprover, *task, framework->info)) { + continue; + } + + getTasks.add_launched_tasks()->CopyFrom(*task); + } + + // Terminated tasks. + foreach (Task* task, executor->terminatedTasks.values()) { + CHECK_NOTNULL(task); + // Skip unauthorized tasks. + if (!approveViewTask(tasksApprover, *task, framework->info)) { + continue; + } + + getTasks.add_terminated_tasks()->CopyFrom(*task); + } + + // Completed tasks. + foreach (const std::shared_ptr<Task>& task, executor->completedTasks) { + // Skip unauthorized tasks. + if (!approveViewTask(tasksApprover, *task.get(), framework->info)) { + continue; + } + + getTasks.add_completed_tasks()->CopyFrom(*task); + } + } + + return getTasks; +} + + string Slave::Http::STATISTICS_HELP() { return HELP( http://git-wip-us.apache.org/repos/asf/mesos/blob/d8e4b05b/src/slave/slave.hpp ---------------------------------------------------------------------- diff --git a/src/slave/slave.hpp b/src/slave/slave.hpp index 48cf77d..4995c84 100644 --- a/src/slave/slave.hpp +++ b/src/slave/slave.hpp @@ -566,6 +566,16 @@ private: const process::Owned<ObjectApprover>& frameworksApprover, const process::Owned<ObjectApprover>& executorsApprover) const; + process::Future<process::http::Response> getTasks( + const mesos::agent::Call& call, + const Option<std::string>& principal, + ContentType contentType) const; + + mesos::agent::Response::GetTasks _getTasks( + const process::Owned<ObjectApprover>& frameworksApprover, + const process::Owned<ObjectApprover>& tasksApprover, + const process::Owned<ObjectApprover>& executorsApprover) const; + Slave* slave; // Used to rate limit the statistics endpoint. http://git-wip-us.apache.org/repos/asf/mesos/blob/d8e4b05b/src/slave/validation.cpp ---------------------------------------------------------------------- diff --git a/src/slave/validation.cpp b/src/slave/validation.cpp index 717169a..a9f3182 100644 --- a/src/slave/validation.cpp +++ b/src/slave/validation.cpp @@ -96,6 +96,9 @@ Option<Error> validate( case mesos::agent::Call::GET_EXECUTORS: return None(); + + case mesos::agent::Call::GET_TASKS: + return None(); } UNREACHABLE();