Decoupled implementation of `tasks()` and `getTasks()` in master.

The response of `getTasks()` is going to look different from that of
`tasks()`, so it doesn't make sense to couple their implementations.
Note that this is pure code movement, there is no functional change.

Review: https://reviews.apache.org/r/49418


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/9de8d5fa
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/9de8d5fa
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/9de8d5fa

Branch: refs/heads/master
Commit: 9de8d5fa0fff2b5372c8f875d94527f21721c0bf
Parents: 05ce17d
Author: Vinod Kone <vinodk...@gmail.com>
Authored: Wed Jun 29 16:49:30 2016 -0700
Committer: Vinod Kone <vinodk...@gmail.com>
Committed: Thu Jun 30 16:35:40 2016 -0700

----------------------------------------------------------------------
 src/master/http.cpp | 145 ++++++++++++++++++++++++++++++++++-------------
 1 file changed, 105 insertions(+), 40 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/9de8d5fa/src/master/http.cpp
----------------------------------------------------------------------
diff --git a/src/master/http.cpp b/src/master/http.cpp
index d5f4e77..338c29a 100644
--- a/src/master/http.cpp
+++ b/src/master/http.cpp
@@ -3167,18 +3167,100 @@ Future<Response> Master::Http::tasks(
   Option<string> order = request.url.query.get("order");
   string _order = order.isSome() && (order.get() == "asc") ? "asc" : "des";
 
-  return _tasks(limit, offset, _order, principal)
-    .then([request](const vector<const Task*>& tasks) -> Response {
-      auto tasksWriter = [&tasks](JSON::ObjectWriter* writer) {
-        writer->field("tasks", [&tasks](JSON::ArrayWriter* writer) {
-          foreach (const Task* task, tasks) {
-            writer->element(*task);
+  // Retrieve Approvers for authorizing frameworks and tasks.
+  Future<Owned<ObjectApprover>> frameworksApprover;
+  Future<Owned<ObjectApprover>> tasksApprover;
+  if (master->authorizer.isSome()) {
+    authorization::Subject subject;
+    if (principal.isSome()) {
+      subject.set_value(principal.get());
+    }
+
+    frameworksApprover = master->authorizer.get()->getObjectApprover(
+        subject, authorization::VIEW_FRAMEWORK);
+
+    tasksApprover = master->authorizer.get()->getObjectApprover(
+        subject, authorization::VIEW_TASK);
+  } else {
+    frameworksApprover = Owned<ObjectApprover>(new AcceptingObjectApprover());
+    tasksApprover = Owned<ObjectApprover>(new AcceptingObjectApprover());
+  }
+
+  return collect(frameworksApprover, tasksApprover)
+    .then(defer(master->self(),
+      [=](const tuple<Owned<ObjectApprover>,
+                      Owned<ObjectApprover>>& approvers)
+        -> Future<Response> {
+      // Get approver from tuple.
+      Owned<ObjectApprover> frameworksApprover;
+      Owned<ObjectApprover> tasksApprover;
+      tie(frameworksApprover, tasksApprover) = approvers;
+
+      // Construct framework list with both active and completed frameworks.
+      vector<const Framework*> frameworks;
+      foreachvalue (Framework* framework, master->frameworks.registered) {
+        // Skip unauthorized frameworks.
+        if (!approveViewFrameworkInfo(frameworksApprover, framework->info)) {
+          continue;
+        }
+
+        frameworks.push_back(framework);
+      }
+
+      foreach (const std::shared_ptr<Framework>& framework,
+               master->frameworks.completed) {
+        // Skip unauthorized frameworks.
+        if (!approveViewFrameworkInfo(frameworksApprover, framework->info)) {
+          continue;
+        }
+
+        frameworks.push_back(framework.get());
+      }
+
+      // Construct task list with both running and finished tasks.
+      vector<const Task*> tasks;
+      foreach (const Framework* framework, frameworks) {
+        foreachvalue (Task* task, framework->tasks) {
+          CHECK_NOTNULL(task);
+          // Skip unauthorized tasks.
+          if (!approveViewTask(tasksApprover, *task, framework->info)) {
+            continue;
+          }
+
+          tasks.push_back(task);
+        }
+        foreach (const std::shared_ptr<Task>& task, framework->completedTasks) 
{
+          // Skip unauthorized tasks.
+          if (!approveViewTask(tasksApprover, *task.get(), framework->info)) {
+            continue;
+          }
+
+          tasks.push_back(task.get());
+        }
+      }
+
+      // Sort tasks by task status timestamp. Default order is descending.
+      // The earliest timestamp is chosen for comparison when
+      // multiple are present.
+      if (_order == "asc") {
+        sort(tasks.begin(), tasks.end(), TaskComparator::ascending);
+      } else {
+        sort(tasks.begin(), tasks.end(), TaskComparator::descending);
+      }
+
+      auto tasksWriter = [&tasks, limit, offset](JSON::ObjectWriter* writer) {
+        writer->field("tasks",
+                      [&tasks, limit, offset](JSON::ArrayWriter* writer) {
+          // Collect 'limit' number of tasks starting from 'offset'.
+          size_t end = std::min(offset + limit, tasks.size());
+          for (size_t i = offset; i < end; i++) {
+            writer->element(*tasks[i]);
           }
         });
       };
 
       return OK(jsonify(tasksWriter), request.url.query.get("jsonp"));
-    });
+  }));
 }
 
 
@@ -3190,6 +3272,10 @@ Future<Response> Master::Http::getTasks(
   CHECK_EQ(mesos::master::Call::GET_TASKS, call.type());
 
   // Get list options (limit and offset).
+
+  // TODO(nnielsen): Currently, formatting errors in offset and/or limit
+  // will silently be ignored. This could be reported to the user instead.
+
   Result<int> result = call.get_tasks().limit();
   CHECK_SOME(result);
   size_t limit = result.get();
@@ -3200,30 +3286,6 @@ Future<Response> Master::Http::getTasks(
   Option<string> order = call.get_tasks().order();
   string _order = order.isSome() && (order.get() == "asc") ? "asc" : "des";
 
-  return _tasks(limit, offset, _order, principal)
-    .then([contentType](const vector<const Task*>& tasks) -> Response {
-      mesos::master::Response response;
-      response.set_type(mesos::master::Response::GET_TASKS);
-
-      mesos::master::Response::GetTasks* getTasks =
-        response.mutable_get_tasks();
-
-      foreach (const Task* task, tasks) {
-        getTasks->add_tasks()->CopyFrom(*task);
-      }
-
-      return OK(serialize(contentType, evolve(response)),
-                stringify(contentType));
-    });
-}
-
-
-Future<vector<const Task*>> Master::Http::_tasks(
-    const size_t limit,
-    const size_t offset,
-    const string& order,
-    const Option<string>& principal) const
-{
   // Retrieve Approvers for authorizing frameworks and tasks.
   Future<Owned<ObjectApprover>> frameworksApprover;
   Future<Owned<ObjectApprover>> tasksApprover;
@@ -3247,15 +3309,12 @@ Future<vector<const Task*>> Master::Http::_tasks(
     .then(defer(master->self(),
       [=](const tuple<Owned<ObjectApprover>,
                       Owned<ObjectApprover>>& approvers)
-        -> vector<const Task*> {
+        -> Future<Response> {
       // Get approver from tuple.
       Owned<ObjectApprover> frameworksApprover;
       Owned<ObjectApprover> tasksApprover;
       tie(frameworksApprover, tasksApprover) = approvers;
 
-      // TODO(nnielsen): Currently, formatting errors in offset and/or limit
-      // will silently be ignored. This could be reported to the user instead.
-
       // Construct framework list with both active and completed frameworks.
       vector<const Framework*> frameworks;
       foreachvalue (Framework* framework, master->frameworks.registered) {
@@ -3266,6 +3325,7 @@ Future<vector<const Task*>> Master::Http::_tasks(
 
         frameworks.push_back(framework);
       }
+
       foreach (const std::shared_ptr<Framework>& framework,
                master->frameworks.completed) {
         // Skip unauthorized frameworks.
@@ -3301,21 +3361,26 @@ Future<vector<const Task*>> Master::Http::_tasks(
       // Sort tasks by task status timestamp. Default order is descending.
       // The earliest timestamp is chosen for comparison when
       // multiple are present.
-      if (order == "asc") {
+      if (_order == "asc") {
         sort(tasks.begin(), tasks.end(), TaskComparator::ascending);
       } else {
         sort(tasks.begin(), tasks.end(), TaskComparator::descending);
       }
 
+      mesos::master::Response response;
+      response.set_type(mesos::master::Response::GET_TASKS);
+
+      mesos::master::Response::GetTasks* getTasks =
+        response.mutable_get_tasks();
+
       // Collect 'limit' number of tasks starting from 'offset'.
-      vector<const Task*> _tasks;
       size_t end = std::min(offset + limit, tasks.size());
       for (size_t i = offset; i < end; i++) {
-        const Task* task = tasks[i];
-        _tasks.push_back(task);
+        getTasks->add_tasks()->CopyFrom(*tasks[i]);
       }
 
-      return _tasks;
+      return OK(serialize(contentType, evolve(response)),
+                stringify(contentType));
   }));
 }
 

Reply via email to