Updated GetTasks v1 call in master.

The response now distinguishes between active tasks, completed tasks,
pending tasks and orphan tasks to make it easy for clients.
Consequently got rid of offset, limit and offset in the Call because
they don't make sense when we have multiple fields in the response.

Review: https://reviews.apache.org/r/49419


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/bfead176
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/bfead176
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/bfead176

Branch: refs/heads/master
Commit: bfead176e4f78314f2368d99f670adcf1d3d3d47
Parents: 9de8d5f
Author: Vinod Kone <vinodk...@gmail.com>
Authored: Wed Jun 29 14:26:42 2016 -0700
Committer: Vinod Kone <vinodk...@gmail.com>
Committed: Thu Jun 30 16:35:40 2016 -0700

----------------------------------------------------------------------
 include/mesos/master/master.proto    | 54 ++++++++++----------
 include/mesos/v1/master/master.proto | 54 ++++++++++----------
 src/master/http.cpp                  | 83 +++++++++++++++++--------------
 src/master/validation.cpp            |  3 --
 src/tests/api_tests.cpp              | 79 +++++++++++++++++++++++------
 5 files changed, 167 insertions(+), 106 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/bfead176/include/mesos/master/master.proto
----------------------------------------------------------------------
diff --git a/include/mesos/master/master.proto 
b/include/mesos/master/master.proto
index 2e5d6ee..92e68fc 100644
--- a/include/mesos/master/master.proto
+++ b/include/mesos/master/master.proto
@@ -58,7 +58,7 @@ message Call {
 
     GET_AGENTS = 11;
     GET_FRAMEWORKS = 12;
-    GET_TASKS = 13;         // See 'GetTasks' below.
+    GET_TASKS = 13;         // Retrieves the information about all known tasks.
     GET_ROLES = 14;         // Retrieves the information about roles.
 
     GET_WEIGHTS = 15;       // Retrieves the information about role weights.
@@ -117,16 +117,6 @@ message Call {
     optional string length = 3;
   }
 
-  // Call::GetTasks requests 'limit' number of tasks starting
-  // from 'offset'. Tasks are sorted in 'order', either ascending
-  // or descending based on timestamp. If 'limit' is not set,
-  // default value 100 will be used.
-  message GetTasks {
-    optional uint32 limit = 1 [default = 100];
-    optional uint32 offset = 2;
-    optional string order = 3;
-  }
-
   message UpdateWeights {
     repeated WeightInfo weight_infos = 1;
   }
@@ -193,17 +183,16 @@ message Call {
   optional SetLoggingLevel set_logging_level = 3;
   optional ListFiles list_files = 4;
   optional ReadFile read_file = 5;
-  optional GetTasks get_tasks = 6;
-  optional UpdateWeights update_weights = 7;
-  optional ReserveResources reserve_resources = 8;
-  optional UnreserveResources unreserve_resources = 9;
-  optional CreateVolumes create_volumes = 10;
-  optional DestroyVolumes destroy_volumes = 11;
-  optional UpdateMaintenanceSchedule update_maintenance_schedule = 12;
-  optional StartMaintenance start_maintenance = 13;
-  optional StopMaintenance stop_maintenance  = 14;
-  optional SetQuota set_quota = 15;
-  optional RemoveQuota remove_quota = 16;
+  optional UpdateWeights update_weights = 6;
+  optional ReserveResources reserve_resources = 7;
+  optional UnreserveResources unreserve_resources = 8;
+  optional CreateVolumes create_volumes = 9;
+  optional DestroyVolumes destroy_volumes = 10;
+  optional UpdateMaintenanceSchedule update_maintenance_schedule = 11;
+  optional StartMaintenance start_maintenance = 12;
+  optional StopMaintenance stop_maintenance  = 13;
+  optional SetQuota set_quota = 14;
+  optional RemoveQuota remove_quota = 15;
 }
 
 
@@ -347,10 +336,25 @@ message Response {
     repeated FrameworkID unregistered_frameworks = 3;
   }
 
-  // Lists both running and completed tasks from all active and completed
-  // frameworks.
+  // Lists information about all the tasks known to the master at the current
+  // time. Note that there might be tasks unknown to the master running on
+  // partitioned or unsubscribed agents.
   message GetTasks {
-    repeated Task tasks = 1;
+    // Tasks that are an enqueued on the master waiting (e.g., authorizing)
+    // to be launched.
+    repeated Task pending_tasks = 1;
+
+    // Tasks that have been forwarded to the agent for launch. This includes
+    // tasks that are running and reached terminal state.
+    repeated Task tasks = 2;
+
+    // Tasks that have reached terminal state and have all their updates
+    // acknowledged by the scheduler.
+    repeated Task completed_tasks = 3;
+
+    // Tasks belonging to frameworks that have not yet re-subscribed with
+    // master (e.g., immediately after master failover).
+    repeated Task orphan_tasks = 4;
   }
 
   // Provides information about every role that is on the role whitelist (if

http://git-wip-us.apache.org/repos/asf/mesos/blob/bfead176/include/mesos/v1/master/master.proto
----------------------------------------------------------------------
diff --git a/include/mesos/v1/master/master.proto 
b/include/mesos/v1/master/master.proto
index 93157d5..7ef7535 100644
--- a/include/mesos/v1/master/master.proto
+++ b/include/mesos/v1/master/master.proto
@@ -58,7 +58,7 @@ message Call {
 
     GET_AGENTS = 11;
     GET_FRAMEWORKS = 12;
-    GET_TASKS = 13;         // See 'GetTasks' below.
+    GET_TASKS = 13;         // Retrieves the information about all known tasks.
     GET_ROLES = 14;         // Retrieves the information about roles.
 
     GET_WEIGHTS = 15;       // Retrieves the information about role weights.
@@ -117,16 +117,6 @@ message Call {
     optional string length = 3;
   }
 
-  // Call::GetTasks requests 'limit' number of tasks starting
-  // from 'offset'. Tasks are sorted in 'order', either ascending
-  // or descending based on timestamp. If 'limit' is not set,
-  // default value 100 will be used.
-  message GetTasks {
-    optional uint32 limit = 1 [default = 100];
-    optional uint32 offset = 2;
-    optional string order = 3;
-  }
-
   message UpdateWeights {
     repeated WeightInfo weight_infos = 1;
   }
@@ -194,17 +184,16 @@ message Call {
   optional SetLoggingLevel set_logging_level = 3;
   optional ListFiles list_files = 4;
   optional ReadFile read_file = 5;
-  optional GetTasks get_tasks = 6;
-  optional UpdateWeights update_weights = 7;
-  optional ReserveResources reserve_resources = 8;
-  optional UnreserveResources unreserve_resources = 9;
-  optional CreateVolumes create_volumes = 10;
-  optional DestroyVolumes destroy_volumes = 11;
-  optional UpdateMaintenanceSchedule update_maintenance_schedule = 12;
-  optional StartMaintenance start_maintenance = 13;
-  optional StopMaintenance stop_maintenance  = 14;
-  optional SetQuota set_quota = 15;
-  optional RemoveQuota remove_quota = 16;
+  optional UpdateWeights update_weights = 6;
+  optional ReserveResources reserve_resources = 7;
+  optional UnreserveResources unreserve_resources = 8;
+  optional CreateVolumes create_volumes = 9;
+  optional DestroyVolumes destroy_volumes = 10;
+  optional UpdateMaintenanceSchedule update_maintenance_schedule = 11;
+  optional StartMaintenance start_maintenance = 12;
+  optional StopMaintenance stop_maintenance  = 13;
+  optional SetQuota set_quota = 14;
+  optional RemoveQuota remove_quota = 15;
 }
 
 
@@ -348,10 +337,25 @@ message Response {
     repeated FrameworkID unregistered_frameworks = 3;
   }
 
-  // Lists both running and completed tasks from all active and completed
-  // frameworks.
+  // Lists information about all the tasks known to the master at the current
+  // time. Note that there might be tasks unknown to the master running on
+  // partitioned or unsubscribed agents.
   message GetTasks {
-    repeated Task tasks = 1;
+    // Tasks that are an enqueued on the master waiting (e.g., authorizing)
+    // to be launched.
+    repeated Task pending_tasks = 1;
+
+    // Tasks that have been forwarded to the agent for launch. This includes
+    // tasks that are running and reached terminal state.
+    repeated Task tasks = 2;
+
+    // Tasks that have reached terminal state and have all their updates
+    // acknowledged by the scheduler.
+    repeated Task completed_tasks = 3;
+
+    // Tasks belonging to frameworks that have not yet re-subscribed with
+    // master (e.g., immediately after master failover).
+    repeated Task orphan_tasks = 4;
   }
 
   // Provides information about every role that is on the role whitelist (if

http://git-wip-us.apache.org/repos/asf/mesos/blob/bfead176/src/master/http.cpp
----------------------------------------------------------------------
diff --git a/src/master/http.cpp b/src/master/http.cpp
index 338c29a..0f4a893 100644
--- a/src/master/http.cpp
+++ b/src/master/http.cpp
@@ -2434,6 +2434,10 @@ Future<Response> Master::Http::state(
             });
 
         // Model all of the orphan tasks.
+        // TODO(vinod): Need to filter these tasks based on authorization! This
+        // is currently not possible because we don't have `FrameworkInfo` for
+        // these tasks. We need to either store `FrameworkInfo` for orphan
+        // tasks or persist FrameworkInfo of all frameworks in the registry.
         writer->field("orphan_tasks", [this](JSON::ArrayWriter* writer) {
           // Find those orphan tasks.
           foreachvalue (const Slave* slave, master->slaves.registered) {
@@ -2452,6 +2456,8 @@ Future<Response> Master::Http::state(
 
         // Model all currently unregistered frameworks. This can happen
         // when a framework has yet to re-register after master failover.
+        // TODO(vinod): Need to filter these frameworks based on authorization!
+        // See the TODO above for "orphan_tasks" for further details.
         writer->field("unregistered_frameworks", [this](
             JSON::ArrayWriter* writer) {
           // Find unregistered frameworks.
@@ -3271,21 +3277,6 @@ Future<Response> Master::Http::getTasks(
 {
   CHECK_EQ(mesos::master::Call::GET_TASKS, call.type());
 
-  // Get list options (limit and offset).
-
-  // TODO(nnielsen): Currently, formatting errors in offset and/or limit
-  // will silently be ignored. This could be reported to the user instead.
-
-  Result<int> result = call.get_tasks().limit();
-  CHECK_SOME(result);
-  size_t limit = result.get();
-
-  result = call.get_tasks().offset();
-  size_t offset = result.isSome() ? result.get() : 0;
-
-  Option<string> order = call.get_tasks().order();
-  string _order = order.isSome() && (order.get() == "asc") ? "asc" : "des";
-
   // Retrieve Approvers for authorizing frameworks and tasks.
   Future<Owned<ObjectApprover>> frameworksApprover;
   Future<Owned<ObjectApprover>> tasksApprover;
@@ -3336,9 +3327,28 @@ Future<Response> Master::Http::getTasks(
         frameworks.push_back(framework.get());
       }
 
-      // Construct task list with both running and finished tasks.
+      mesos::master::Response response;
+      response.set_type(mesos::master::Response::GET_TASKS);
+
+      mesos::master::Response::GetTasks* getTasks =
+        response.mutable_get_tasks();
+
       vector<const Task*> tasks;
       foreach (const Framework* framework, frameworks) {
+        // Pending tasks.
+        foreachvalue (const TaskInfo& taskInfo, framework->pendingTasks) {
+          // Skip unauthorized tasks.
+          if (!approveViewTaskInfo(tasksApprover, taskInfo, framework->info)) {
+            continue;
+          }
+
+          const Task& task =
+            protobuf::createTask(taskInfo, TASK_STAGING, framework->id());
+
+          getTasks->add_pending_tasks()->CopyFrom(task);
+        }
+
+        // Active tasks.
         foreachvalue (Task* task, framework->tasks) {
           CHECK_NOTNULL(task);
           // Skip unauthorized tasks.
@@ -3346,37 +3356,36 @@ Future<Response> Master::Http::getTasks(
             continue;
           }
 
-          tasks.push_back(task);
+          getTasks->add_tasks()->CopyFrom(*task);
         }
+
+        // Completed tasks.
         foreach (const std::shared_ptr<Task>& task, framework->completedTasks) 
{
           // Skip unauthorized tasks.
           if (!approveViewTask(tasksApprover, *task.get(), framework->info)) {
             continue;
           }
 
-          tasks.push_back(task.get());
+          getTasks->add_completed_tasks()->CopyFrom(*task);
         }
       }
 
-      // Sort tasks by task status timestamp. Default order is descending.
-      // The earliest timestamp is chosen for comparison when
-      // multiple are present.
-      if (_order == "asc") {
-        sort(tasks.begin(), tasks.end(), TaskComparator::ascending);
-      } else {
-        sort(tasks.begin(), tasks.end(), TaskComparator::descending);
-      }
-
-      mesos::master::Response response;
-      response.set_type(mesos::master::Response::GET_TASKS);
-
-      mesos::master::Response::GetTasks* getTasks =
-        response.mutable_get_tasks();
-
-      // Collect 'limit' number of tasks starting from 'offset'.
-      size_t end = std::min(offset + limit, tasks.size());
-      for (size_t i = offset; i < end; i++) {
-        getTasks->add_tasks()->CopyFrom(*tasks[i]);
+      // Orphan tasks.
+      // TODO(vinod): Need to filter these tasks based on authorization! This
+      // is currently not possible because we don't have `FrameworkInfo` for
+      // these tasks. We need to either store `FrameworkInfo` for orphan
+      // tasks or persist FrameworkInfo of all frameworks in the registry.
+      foreachvalue (const Slave* slave, master->slaves.registered) {
+        typedef hashmap<TaskID, Task*> TaskMap;
+        foreachvalue (const TaskMap& tasks, slave->tasks) {
+          foreachvalue (const Task* task, tasks) {
+            CHECK_NOTNULL(task);
+            if (!master->frameworks.registered.contains(
+                task->framework_id())) {
+              getTasks->add_orphan_tasks()->CopyFrom(*task);
+            }
+          }
+        }
       }
 
       return OK(serialize(contentType, evolve(response)),

http://git-wip-us.apache.org/repos/asf/mesos/blob/bfead176/src/master/validation.cpp
----------------------------------------------------------------------
diff --git a/src/master/validation.cpp b/src/master/validation.cpp
index 6939d0e..5026afc 100644
--- a/src/master/validation.cpp
+++ b/src/master/validation.cpp
@@ -118,9 +118,6 @@ Option<Error> validate(
       return None();
 
     case mesos::master::Call::GET_TASKS:
-      if (!call.has_get_tasks()) {
-        return Error("Expecting 'get_tasks' to be present");
-      }
       return None();
 
     case mesos::master::Call::GET_ROLES:

http://git-wip-us.apache.org/repos/asf/mesos/blob/bfead176/src/tests/api_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/api_tests.cpp b/src/tests/api_tests.cpp
index b3127b7..e53c5a8 100644
--- a/src/tests/api_tests.cpp
+++ b/src/tests/api_tests.cpp
@@ -78,6 +78,7 @@ using recordio::Decoder;
 using testing::_;
 using testing::AtMost;
 using testing::DoAll;
+using testing::Eq;
 using testing::Return;
 using testing::WithParamInterface;
 
@@ -334,14 +335,12 @@ TEST_P(MasterAPITest, GetMetrics)
 }
 
 
-// This tests v1 API GetTasks when no task is present.
 TEST_P(MasterAPITest, GetTasksNoRunningTask)
 {
   Try<Owned<cluster::Master>> master = this->StartMaster();
   ASSERT_SOME(master);
 
   v1::master::Call v1Call;
-  v1Call.mutable_get_tasks();
   v1Call.set_type(v1::master::Call::GET_TASKS);
 
   ContentType contentType = GetParam();
@@ -352,11 +351,16 @@ TEST_P(MasterAPITest, GetTasksNoRunningTask)
   AWAIT_READY(v1Response);
   ASSERT_TRUE(v1Response.get().IsInitialized());
   ASSERT_EQ(v1::master::Response::GET_TASKS, v1Response.get().type());
-  ASSERT_EQ(0, v1Response.get().get_tasks().tasks_size());
+
+  ASSERT_EQ(0, v1Response.get().get_tasks().pending_tasks().size());
+  ASSERT_EQ(0, v1Response.get().get_tasks().tasks().size());
+  ASSERT_EQ(0, v1Response.get().get_tasks().completed_tasks().size());
+  ASSERT_EQ(0, v1Response.get().get_tasks().orphan_tasks().size());
 }
 
 
-// This tests v1 API GetTasks with 1 running task.
+// This test verifies that the GetTasks v1 API call returns responses correctly
+// when the task transitions from being active to completed.
 TEST_P(MasterAPITest, GetTasks)
 {
   Try<Owned<cluster::Master>> master = StartMaster();
@@ -393,8 +397,9 @@ TEST_P(MasterAPITest, GetTasks)
   task.mutable_resources()->MergeFrom(offers.get()[0].resources());
   task.mutable_executor()->MergeFrom(DEFAULT_EXECUTOR_INFO);
 
+  Future<ExecutorDriver*> execDriver;
   EXPECT_CALL(exec, registered(_, _, _, _))
-    .Times(1);
+    .WillOnce(FutureArg<0>(&execDriver));
 
   EXPECT_CALL(exec, launchTask(_, _))
     .WillOnce(SendStatusUpdateFromTask(TASK_RUNNING));
@@ -405,28 +410,70 @@ TEST_P(MasterAPITest, GetTasks)
 
   driver.launchTasks(offers.get()[0].id(), {task});
 
+  AWAIT_READY(execDriver);
+
   AWAIT_READY(status);
   EXPECT_EQ(TASK_RUNNING, status.get().state());
   EXPECT_TRUE(status.get().has_executor_id());
   EXPECT_EQ(exec.id, status.get().executor_id());
 
   v1::master::Call v1Call;
-  v1Call.mutable_get_tasks();
   v1Call.set_type(v1::master::Call::GET_TASKS);
 
   ContentType contentType = GetParam();
 
-  Future<v1::master::Response> v1Response =
-    post(master.get()->pid, v1Call, contentType);
+  {
+    Future<v1::master::Response> v1Response =
+      post(master.get()->pid, v1Call, contentType);
 
-  AWAIT_READY(v1Response);
-  ASSERT_TRUE(v1Response.get().IsInitialized());
-  ASSERT_EQ(v1::master::Response::GET_TASKS, v1Response.get().type());
-  ASSERT_EQ(1, v1Response.get().get_tasks().tasks_size());
-  ASSERT_EQ(v1::TaskState::TASK_RUNNING,
-            v1Response.get().get_tasks().tasks(0).state());
-  ASSERT_EQ("test", v1Response.get().get_tasks().tasks(0).name());
-  ASSERT_EQ("1", v1Response.get().get_tasks().tasks(0).task_id().value());
+    AWAIT_READY(v1Response);
+    ASSERT_TRUE(v1Response.get().IsInitialized());
+    ASSERT_EQ(v1::master::Response::GET_TASKS, v1Response.get().type());
+    ASSERT_EQ(1, v1Response.get().get_tasks().tasks().size());
+    ASSERT_EQ(v1::TaskState::TASK_RUNNING,
+              v1Response.get().get_tasks().tasks(0).state());
+    ASSERT_EQ("test", v1Response.get().get_tasks().tasks(0).name());
+    ASSERT_EQ("1", v1Response.get().get_tasks().tasks(0).task_id().value());
+  }
+
+  Future<StatusUpdateAcknowledgementMessage> acknowledgement =
+    FUTURE_PROTOBUF(
+        StatusUpdateAcknowledgementMessage(),
+        _,
+        Eq(slave.get()->pid));
+
+  Future<TaskStatus> status2;
+  EXPECT_CALL(sched, statusUpdate(&driver, _))
+    .WillOnce(FutureArg<1>(&status2));
+
+  // Send a terminal update so that the task transitions to completed.
+  TaskStatus status3;
+  status3.mutable_task_id()->CopyFrom(task.task_id());
+  status3.set_state(TASK_FINISHED);
+
+  execDriver.get()->sendStatusUpdate(status3);
+
+  AWAIT_READY(status2);
+  EXPECT_EQ(TASK_FINISHED, status2.get().state());
+
+  AWAIT_READY(acknowledgement);
+
+  {
+    Future<v1::master::Response> v1Response =
+      post(master.get()->pid, v1Call, contentType);
+
+    AWAIT_READY(v1Response);
+    ASSERT_TRUE(v1Response.get().IsInitialized());
+    ASSERT_EQ(v1::master::Response::GET_TASKS, v1Response.get().type());
+    ASSERT_EQ(0, v1Response.get().get_tasks().tasks().size());
+    ASSERT_EQ(1, v1Response.get().get_tasks().completed_tasks().size());
+    ASSERT_EQ(v1::TaskState::TASK_FINISHED,
+              v1Response.get().get_tasks().completed_tasks(0).state());
+    ASSERT_EQ("test", v1Response.get().get_tasks().completed_tasks(0).name());
+    ASSERT_EQ(
+        "1",
+        v1Response.get().get_tasks().completed_tasks(0).task_id().value());
+  }
 
   EXPECT_CALL(exec, shutdown(_))
     .Times(AtMost(1));

Reply via email to