Implemented 'GetTasks' call in v1 agent API.

Review: https://reviews.apache.org/r/49759/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/d8e4b05b
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/d8e4b05b
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/d8e4b05b

Branch: refs/heads/master
Commit: d8e4b05bce33e9a1a459d328fc7f97c73fe4eeee
Parents: 950e9ce
Author: haosdent huang <haosd...@gmail.com>
Authored: Wed Jul 13 12:10:16 2016 -0700
Committer: Vinod Kone <vinodk...@gmail.com>
Committed: Wed Jul 13 12:10:16 2016 -0700

----------------------------------------------------------------------
 include/mesos/agent/agent.proto    |  24 +++++
 include/mesos/v1/agent/agent.proto |  24 +++++
 src/slave/http.cpp                 | 184 ++++++++++++++++++++++++++++++++
 src/slave/slave.hpp                |  10 ++
 src/slave/validation.cpp           |   3 +
 5 files changed, 245 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/d8e4b05b/include/mesos/agent/agent.proto
----------------------------------------------------------------------
diff --git a/include/mesos/agent/agent.proto b/include/mesos/agent/agent.proto
index 2d39a63..528b8b3 100644
--- a/include/mesos/agent/agent.proto
+++ b/include/mesos/agent/agent.proto
@@ -52,6 +52,7 @@ message Call {
     GET_CONTAINERS = 10;
     GET_FRAMEWORKS = 11;    // Retrieves the information about known 
frameworks.
     GET_EXECUTORS = 12;     // Retrieves the information about known executors.
+    GET_TASKS = 13;         // Retrieves the information about known tasks.
   }
 
   // Provides a snapshot of the current metrics tracked by the agent.
@@ -124,6 +125,7 @@ message Response {
     GET_CONTAINERS = 9;
     GET_FRAMEWORKS = 10;           // See 'GetFrameworks' below.
     GET_EXECUTORS = 11;            // See 'GetExecutors' below.
+    GET_TASKS = 12;                // See 'GetTasks' below.
   }
 
   // `healthy` would be true if the agent is healthy. Delayed responses are 
also
@@ -207,6 +209,27 @@ message Response {
     repeated Executor completed_executors = 2;
   }
 
+  // Lists information about all the tasks known to the agent at the current
+  // time.
+  message GetTasks {
+    // Tasks that are pending in the agent's queue before an executor is
+    // launched.
+    repeated Task pending_tasks = 1;
+
+    // Tasks that are enqueued for a launched executor that has not yet
+    // registered.
+    repeated Task queued_tasks = 2;
+
+    // Tasks that are running.
+    repeated Task launched_tasks = 3;
+
+    // Tasks that are terminated but pending updates.
+    repeated Task terminated_tasks = 4;
+
+    // Tasks that are terminated and updates acked.
+    repeated Task completed_tasks = 5;
+  }
+
   optional Type type = 1;
 
   optional GetHealth get_health = 2;
@@ -220,4 +243,5 @@ message Response {
   optional GetContainers get_containers = 10;
   optional GetFrameworks get_frameworks = 11;
   optional GetExecutors get_executors = 12;
+  optional GetTasks get_tasks = 13;
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/mesos/blob/d8e4b05b/include/mesos/v1/agent/agent.proto
----------------------------------------------------------------------
diff --git a/include/mesos/v1/agent/agent.proto 
b/include/mesos/v1/agent/agent.proto
index 052f942..699a17b 100644
--- a/include/mesos/v1/agent/agent.proto
+++ b/include/mesos/v1/agent/agent.proto
@@ -52,6 +52,7 @@ message Call {
     GET_CONTAINERS = 10;
     GET_FRAMEWORKS = 11;    // Retrieves the information about known 
frameworks.
     GET_EXECUTORS = 12;     // Retrieves the information about known executors.
+    GET_TASKS = 13;         // Retrieves the information about known tasks.
   }
 
   // Provides a snapshot of the current metrics tracked by the agent.
@@ -124,6 +125,7 @@ message Response {
     GET_CONTAINERS = 9;
     GET_FRAMEWORKS = 10;           // See 'GetFrameworks' below.
     GET_EXECUTORS = 11;            // See 'GetExecutors' below.
+    GET_TASKS = 12;                // See 'GetTasks' below.
   }
 
   // `healthy` would be true if the agent is healthy. Delayed responses are 
also
@@ -207,6 +209,27 @@ message Response {
     repeated Executor completed_executors = 2;
   }
 
+  // Lists information about all the tasks known to the agent at the current
+  // time.
+  message GetTasks {
+    // Tasks that are pending in the agent's queue before an executor is
+    // launched.
+    repeated Task pending_tasks = 1;
+
+    // Tasks that are enqueued for a launched executor that has not yet
+    // registered.
+    repeated Task queued_tasks = 2;
+
+    // Tasks that are running.
+    repeated Task launched_tasks = 3;
+
+    // Tasks that are terminated but pending updates.
+    repeated Task terminated_tasks = 4;
+
+    // Tasks that are terminated and updates acked.
+    repeated Task completed_tasks = 5;
+  }
+
   optional Type type = 1;
 
   optional GetHealth get_health = 2;
@@ -220,4 +243,5 @@ message Response {
   optional GetContainers get_containers = 10;
   optional GetFrameworks get_frameworks = 11;
   optional GetExecutors get_executors = 12;
+  optional GetTasks get_tasks = 13;
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/mesos/blob/d8e4b05b/src/slave/http.cpp
----------------------------------------------------------------------
diff --git a/src/slave/http.cpp b/src/slave/http.cpp
index a242e0b..63968bf 100644
--- a/src/slave/http.cpp
+++ b/src/slave/http.cpp
@@ -395,6 +395,9 @@ Future<Response> Slave::Http::api(
 
     case agent::Call::GET_EXECUTORS:
       return getExecutors(call, principal, acceptType);
+
+    case agent::Call::GET_TASKS:
+      return getTasks(call, principal, acceptType);
   }
 
   UNREACHABLE();
@@ -1227,6 +1230,187 @@ agent::Response::GetExecutors 
Slave::Http::_getExecutors(
 }
 
 
+Future<Response> Slave::Http::getTasks(
+    const agent::Call& call,
+    const Option<string>& principal,
+    ContentType contentType) const
+{
+  CHECK_EQ(agent::Call::GET_TASKS, call.type());
+
+  // Retrieve Approvers for authorizing frameworks and tasks.
+  Future<Owned<ObjectApprover>> frameworksApprover;
+  Future<Owned<ObjectApprover>> tasksApprover;
+  Future<Owned<ObjectApprover>> executorsApprover;
+  if (slave->authorizer.isSome()) {
+    authorization::Subject subject;
+    if (principal.isSome()) {
+      subject.set_value(principal.get());
+    }
+
+    frameworksApprover = slave->authorizer.get()->getObjectApprover(
+        subject, authorization::VIEW_FRAMEWORK);
+
+    tasksApprover = slave->authorizer.get()->getObjectApprover(
+        subject, authorization::VIEW_TASK);
+
+    executorsApprover = slave->authorizer.get()->getObjectApprover(
+        subject, authorization::VIEW_EXECUTOR);
+  } else {
+    frameworksApprover = Owned<ObjectApprover>(new AcceptingObjectApprover());
+    tasksApprover = Owned<ObjectApprover>(new AcceptingObjectApprover());
+    executorsApprover = Owned<ObjectApprover>(new AcceptingObjectApprover());
+  }
+
+  return collect(frameworksApprover, tasksApprover, executorsApprover)
+    .then(defer(slave->self(),
+      [this, contentType](const tuple<Owned<ObjectApprover>,
+                                      Owned<ObjectApprover>,
+                                      Owned<ObjectApprover>>& approvers)
+        -> Future<Response> {
+      // Get approver from tuple.
+      Owned<ObjectApprover> frameworksApprover;
+      Owned<ObjectApprover> tasksApprover;
+      Owned<ObjectApprover> executorsApprover;
+      tie(frameworksApprover, tasksApprover, executorsApprover) = approvers;
+
+      agent::Response response;
+      response.set_type(agent::Response::GET_TASKS);
+
+      response.mutable_get_tasks()->CopyFrom(
+          _getTasks(frameworksApprover,
+                    tasksApprover,
+                    executorsApprover));
+
+      return OK(serialize(contentType, evolve(response)),
+                stringify(contentType));
+  }));
+}
+
+
+agent::Response::GetTasks Slave::Http::_getTasks(
+    const Owned<ObjectApprover>& frameworksApprover,
+    const Owned<ObjectApprover>& tasksApprover,
+    const Owned<ObjectApprover>& executorsApprover) const
+{
+  // Construct framework list with both active and completed frameworks.
+  vector<const Framework*> frameworks;
+  foreachvalue (Framework* framework, slave->frameworks) {
+    // Skip unauthorized frameworks.
+    if (!approveViewFrameworkInfo(frameworksApprover, framework->info)) {
+      continue;
+    }
+
+    frameworks.push_back(framework);
+  }
+
+  foreach (const Owned<Framework>& framework, slave->completedFrameworks) {
+    // Skip unauthorized frameworks.
+    if (!approveViewFrameworkInfo(frameworksApprover, framework->info)) {
+      continue;
+    }
+
+    frameworks.push_back(framework.get());
+  }
+
+  // Construct executor list with both active and completed executors.
+  hashmap<const Executor*, const Framework*> executors;
+  foreach (const Framework* framework, frameworks) {
+    foreachvalue (Executor* executor, framework->executors) {
+      // Skip unauthorized executors.
+      if (!approveViewExecutorInfo(executorsApprover,
+                                   executor->info,
+                                   framework->info)) {
+        continue;
+      }
+
+      executors.put(executor, framework);
+    }
+
+    foreach (const Owned<Executor>& executor, framework->completedExecutors) {
+      // Skip unauthorized executors.
+      if (!approveViewExecutorInfo(executorsApprover,
+                                   executor->info,
+                                   framework->info)) {
+        continue;
+      }
+
+      executors.put(executor.get(), framework);
+    }
+  }
+
+  agent::Response::GetTasks getTasks;
+
+  foreach (const Framework* framework, frameworks) {
+    // Pending tasks.
+    typedef hashmap<TaskID, TaskInfo> TaskMap;
+    foreachvalue (const TaskMap& taskInfos, framework->pending) {
+      foreachvalue (const TaskInfo& taskInfo, taskInfos) {
+        // Skip unauthorized tasks.
+        if (!approveViewTaskInfo(tasksApprover, taskInfo, framework->info)) {
+          continue;
+        }
+
+        const Task& task =
+          protobuf::createTask(taskInfo, TASK_STAGING, framework->id());
+
+        getTasks.add_pending_tasks()->CopyFrom(task);
+      }
+    }
+  }
+
+  foreachpair (const Executor* executor,
+               const Framework* framework,
+               executors) {
+    // Queued tasks.
+    foreach (const TaskInfo& taskInfo, executor->queuedTasks.values()) {
+      // Skip unauthorized tasks.
+      if (!approveViewTaskInfo(tasksApprover, taskInfo, framework->info)) {
+        continue;
+      }
+
+      const Task& task =
+        protobuf::createTask(taskInfo, TASK_STAGING, framework->id());
+
+      getTasks.add_queued_tasks()->CopyFrom(task);
+    }
+
+    // Launched tasks.
+    foreach (Task* task, executor->launchedTasks.values()) {
+      CHECK_NOTNULL(task);
+      // Skip unauthorized tasks.
+      if (!approveViewTask(tasksApprover, *task, framework->info)) {
+        continue;
+      }
+
+      getTasks.add_launched_tasks()->CopyFrom(*task);
+    }
+
+    // Terminated tasks.
+    foreach (Task* task, executor->terminatedTasks.values()) {
+      CHECK_NOTNULL(task);
+      // Skip unauthorized tasks.
+      if (!approveViewTask(tasksApprover, *task, framework->info)) {
+        continue;
+      }
+
+      getTasks.add_terminated_tasks()->CopyFrom(*task);
+    }
+
+    // Completed tasks.
+    foreach (const std::shared_ptr<Task>& task, executor->completedTasks) {
+      // Skip unauthorized tasks.
+      if (!approveViewTask(tasksApprover, *task.get(), framework->info)) {
+        continue;
+      }
+
+      getTasks.add_completed_tasks()->CopyFrom(*task);
+    }
+  }
+
+  return getTasks;
+}
+
+
 string Slave::Http::STATISTICS_HELP()
 {
   return HELP(

http://git-wip-us.apache.org/repos/asf/mesos/blob/d8e4b05b/src/slave/slave.hpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.hpp b/src/slave/slave.hpp
index 48cf77d..4995c84 100644
--- a/src/slave/slave.hpp
+++ b/src/slave/slave.hpp
@@ -566,6 +566,16 @@ private:
         const process::Owned<ObjectApprover>& frameworksApprover,
         const process::Owned<ObjectApprover>& executorsApprover) const;
 
+    process::Future<process::http::Response> getTasks(
+        const mesos::agent::Call& call,
+        const Option<std::string>& principal,
+        ContentType contentType) const;
+
+    mesos::agent::Response::GetTasks _getTasks(
+        const process::Owned<ObjectApprover>& frameworksApprover,
+        const process::Owned<ObjectApprover>& tasksApprover,
+        const process::Owned<ObjectApprover>& executorsApprover) const;
+
     Slave* slave;
 
     // Used to rate limit the statistics endpoint.

http://git-wip-us.apache.org/repos/asf/mesos/blob/d8e4b05b/src/slave/validation.cpp
----------------------------------------------------------------------
diff --git a/src/slave/validation.cpp b/src/slave/validation.cpp
index 717169a..a9f3182 100644
--- a/src/slave/validation.cpp
+++ b/src/slave/validation.cpp
@@ -96,6 +96,9 @@ Option<Error> validate(
 
     case mesos::agent::Call::GET_EXECUTORS:
       return None();
+
+    case mesos::agent::Call::GET_TASKS:
+      return None();
   }
 
   UNREACHABLE();

Reply via email to