Decoupled implementation of `tasks()` and `getTasks()` in master. The response of `getTasks()` is going to look different from that of `tasks()`, so it doesn't make sense to couple their implementations. Note that this is pure code movement, there is no functional change.
Review: https://reviews.apache.org/r/49418 Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/9de8d5fa Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/9de8d5fa Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/9de8d5fa Branch: refs/heads/master Commit: 9de8d5fa0fff2b5372c8f875d94527f21721c0bf Parents: 05ce17d Author: Vinod Kone <vinodk...@gmail.com> Authored: Wed Jun 29 16:49:30 2016 -0700 Committer: Vinod Kone <vinodk...@gmail.com> Committed: Thu Jun 30 16:35:40 2016 -0700 ---------------------------------------------------------------------- src/master/http.cpp | 145 ++++++++++++++++++++++++++++++++++------------- 1 file changed, 105 insertions(+), 40 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/9de8d5fa/src/master/http.cpp ---------------------------------------------------------------------- diff --git a/src/master/http.cpp b/src/master/http.cpp index d5f4e77..338c29a 100644 --- a/src/master/http.cpp +++ b/src/master/http.cpp @@ -3167,18 +3167,100 @@ Future<Response> Master::Http::tasks( Option<string> order = request.url.query.get("order"); string _order = order.isSome() && (order.get() == "asc") ? "asc" : "des"; - return _tasks(limit, offset, _order, principal) - .then([request](const vector<const Task*>& tasks) -> Response { - auto tasksWriter = [&tasks](JSON::ObjectWriter* writer) { - writer->field("tasks", [&tasks](JSON::ArrayWriter* writer) { - foreach (const Task* task, tasks) { - writer->element(*task); + // Retrieve Approvers for authorizing frameworks and tasks. + Future<Owned<ObjectApprover>> frameworksApprover; + Future<Owned<ObjectApprover>> tasksApprover; + if (master->authorizer.isSome()) { + authorization::Subject subject; + if (principal.isSome()) { + subject.set_value(principal.get()); + } + + frameworksApprover = master->authorizer.get()->getObjectApprover( + subject, authorization::VIEW_FRAMEWORK); + + tasksApprover = master->authorizer.get()->getObjectApprover( + subject, authorization::VIEW_TASK); + } else { + frameworksApprover = Owned<ObjectApprover>(new AcceptingObjectApprover()); + tasksApprover = Owned<ObjectApprover>(new AcceptingObjectApprover()); + } + + return collect(frameworksApprover, tasksApprover) + .then(defer(master->self(), + [=](const tuple<Owned<ObjectApprover>, + Owned<ObjectApprover>>& approvers) + -> Future<Response> { + // Get approver from tuple. + Owned<ObjectApprover> frameworksApprover; + Owned<ObjectApprover> tasksApprover; + tie(frameworksApprover, tasksApprover) = approvers; + + // Construct framework list with both active and completed frameworks. + vector<const Framework*> frameworks; + foreachvalue (Framework* framework, master->frameworks.registered) { + // Skip unauthorized frameworks. + if (!approveViewFrameworkInfo(frameworksApprover, framework->info)) { + continue; + } + + frameworks.push_back(framework); + } + + foreach (const std::shared_ptr<Framework>& framework, + master->frameworks.completed) { + // Skip unauthorized frameworks. + if (!approveViewFrameworkInfo(frameworksApprover, framework->info)) { + continue; + } + + frameworks.push_back(framework.get()); + } + + // Construct task list with both running and finished tasks. + vector<const Task*> tasks; + foreach (const Framework* framework, frameworks) { + foreachvalue (Task* task, framework->tasks) { + CHECK_NOTNULL(task); + // Skip unauthorized tasks. + if (!approveViewTask(tasksApprover, *task, framework->info)) { + continue; + } + + tasks.push_back(task); + } + foreach (const std::shared_ptr<Task>& task, framework->completedTasks) { + // Skip unauthorized tasks. + if (!approveViewTask(tasksApprover, *task.get(), framework->info)) { + continue; + } + + tasks.push_back(task.get()); + } + } + + // Sort tasks by task status timestamp. Default order is descending. + // The earliest timestamp is chosen for comparison when + // multiple are present. + if (_order == "asc") { + sort(tasks.begin(), tasks.end(), TaskComparator::ascending); + } else { + sort(tasks.begin(), tasks.end(), TaskComparator::descending); + } + + auto tasksWriter = [&tasks, limit, offset](JSON::ObjectWriter* writer) { + writer->field("tasks", + [&tasks, limit, offset](JSON::ArrayWriter* writer) { + // Collect 'limit' number of tasks starting from 'offset'. + size_t end = std::min(offset + limit, tasks.size()); + for (size_t i = offset; i < end; i++) { + writer->element(*tasks[i]); } }); }; return OK(jsonify(tasksWriter), request.url.query.get("jsonp")); - }); + })); } @@ -3190,6 +3272,10 @@ Future<Response> Master::Http::getTasks( CHECK_EQ(mesos::master::Call::GET_TASKS, call.type()); // Get list options (limit and offset). + + // TODO(nnielsen): Currently, formatting errors in offset and/or limit + // will silently be ignored. This could be reported to the user instead. + Result<int> result = call.get_tasks().limit(); CHECK_SOME(result); size_t limit = result.get(); @@ -3200,30 +3286,6 @@ Future<Response> Master::Http::getTasks( Option<string> order = call.get_tasks().order(); string _order = order.isSome() && (order.get() == "asc") ? "asc" : "des"; - return _tasks(limit, offset, _order, principal) - .then([contentType](const vector<const Task*>& tasks) -> Response { - mesos::master::Response response; - response.set_type(mesos::master::Response::GET_TASKS); - - mesos::master::Response::GetTasks* getTasks = - response.mutable_get_tasks(); - - foreach (const Task* task, tasks) { - getTasks->add_tasks()->CopyFrom(*task); - } - - return OK(serialize(contentType, evolve(response)), - stringify(contentType)); - }); -} - - -Future<vector<const Task*>> Master::Http::_tasks( - const size_t limit, - const size_t offset, - const string& order, - const Option<string>& principal) const -{ // Retrieve Approvers for authorizing frameworks and tasks. Future<Owned<ObjectApprover>> frameworksApprover; Future<Owned<ObjectApprover>> tasksApprover; @@ -3247,15 +3309,12 @@ Future<vector<const Task*>> Master::Http::_tasks( .then(defer(master->self(), [=](const tuple<Owned<ObjectApprover>, Owned<ObjectApprover>>& approvers) - -> vector<const Task*> { + -> Future<Response> { // Get approver from tuple. Owned<ObjectApprover> frameworksApprover; Owned<ObjectApprover> tasksApprover; tie(frameworksApprover, tasksApprover) = approvers; - // TODO(nnielsen): Currently, formatting errors in offset and/or limit - // will silently be ignored. This could be reported to the user instead. - // Construct framework list with both active and completed frameworks. vector<const Framework*> frameworks; foreachvalue (Framework* framework, master->frameworks.registered) { @@ -3266,6 +3325,7 @@ Future<vector<const Task*>> Master::Http::_tasks( frameworks.push_back(framework); } + foreach (const std::shared_ptr<Framework>& framework, master->frameworks.completed) { // Skip unauthorized frameworks. @@ -3301,21 +3361,26 @@ Future<vector<const Task*>> Master::Http::_tasks( // Sort tasks by task status timestamp. Default order is descending. // The earliest timestamp is chosen for comparison when // multiple are present. - if (order == "asc") { + if (_order == "asc") { sort(tasks.begin(), tasks.end(), TaskComparator::ascending); } else { sort(tasks.begin(), tasks.end(), TaskComparator::descending); } + mesos::master::Response response; + response.set_type(mesos::master::Response::GET_TASKS); + + mesos::master::Response::GetTasks* getTasks = + response.mutable_get_tasks(); + // Collect 'limit' number of tasks starting from 'offset'. - vector<const Task*> _tasks; size_t end = std::min(offset + limit, tasks.size()); for (size_t i = offset; i < end; i++) { - const Task* task = tasks[i]; - _tasks.push_back(task); + getTasks->add_tasks()->CopyFrom(*tasks[i]); } - return _tasks; + return OK(serialize(contentType, evolve(response)), + stringify(contentType)); })); }