Model pending tasks in the Master's metrics and JSON.

Review: https://reviews.apache.org/r/24515


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/85303d17
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/85303d17
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/85303d17

Branch: refs/heads/master
Commit: 85303d17e264fdceb9dd814018384e8759aa93b7
Parents: 3ff1180
Author: Benjamin Mahler <bmah...@twitter.com>
Authored: Fri Aug 8 15:46:08 2014 -0700
Committer: Benjamin Mahler <bmah...@twitter.com>
Committed: Wed Aug 13 11:54:23 2014 -0700

----------------------------------------------------------------------
 src/Makefile.am                 |   3 +-
 src/common/http.cpp             |  54 +++++++++++++----
 src/common/http.hpp             |  15 +++++
 src/master/http.cpp             |   7 +++
 src/master/master.cpp           |  21 ++++++-
 src/tests/common/http_tests.cpp | 113 +++++++++++++++++++++++++++++++++++
 6 files changed, 200 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/85303d17/src/Makefile.am
----------------------------------------------------------------------
diff --git a/src/Makefile.am b/src/Makefile.am
index 60f89ed..0ac95b4 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -1099,7 +1099,8 @@ mesos_tests_SOURCES =                             \
   tests/status_update_manager_tests.cpp                \
   tests/utils.cpp                              \
   tests/values_tests.cpp                       \
-  tests/zookeeper_url_tests.cpp
+  tests/zookeeper_url_tests.cpp                        \
+  tests/common/http_tests.cpp
 
 mesos_tests_CPPFLAGS = $(MESOS_CPPFLAGS)
 mesos_tests_CPPFLAGS += -DSOURCE_DIR=\"$(abs_top_srcdir)\"

http://git-wip-us.apache.org/repos/asf/mesos/blob/85303d17/src/common/http.cpp
----------------------------------------------------------------------
diff --git a/src/common/http.cpp b/src/common/http.cpp
index d27fe21..58050e9 100644
--- a/src/common/http.cpp
+++ b/src/common/http.cpp
@@ -16,12 +16,8 @@
  * limitations under the License.
  */
 
-#include <map>
-#include <string>
+#include <vector>
 
-#include <glog/logging.h>
-
-#include <mesos/mesos.hpp>
 #include <mesos/resources.hpp>
 
 #include <stout/foreach.hpp>
@@ -32,14 +28,13 @@
 
 #include "messages/messages.hpp"
 
-using std::map;
-using std::string;
+using std::vector;
 
 namespace mesos {
 namespace internal {
 
-// TODO(bmahler): Kill these in favor of automatic Proto->JSON Conversion (when
-// it becomes available).
+// TODO(bmahler): Kill these in favor of automatic Proto->JSON
+// Conversion (when it becomes available).
 
 JSON::Object model(const Resources& resources)
 {
@@ -118,7 +113,13 @@ JSON::Object model(const Task& task)
   object.values["id"] = task.task_id().value();
   object.values["name"] = task.name();
   object.values["framework_id"] = task.framework_id().value();
-  object.values["executor_id"] = task.executor_id().value();
+
+  if (task.has_executor_id()) {
+    object.values["executor_id"] = task.executor_id().value();
+  } else {
+    object.values["executor_id"] = "";
+  }
+
   object.values["slave_id"] = task.slave_id().value();
   object.values["state"] = TaskState_Name(task.state());
   object.values["resources"] = model(task.resources());
@@ -132,5 +133,38 @@ JSON::Object model(const Task& task)
   return object;
 }
 
+
+// TODO(bmahler): Expose the executor name / source.
+JSON::Object model(
+    const TaskInfo& task,
+    const FrameworkID& frameworkId,
+    const TaskState& state,
+    const vector<TaskStatus>& statuses)
+{
+  JSON::Object object;
+  object.values["id"] = task.task_id().value();
+  object.values["name"] = task.name();
+  object.values["framework_id"] = frameworkId.value();
+
+  if (task.has_executor()) {
+    object.values["executor_id"] = task.executor().executor_id().value();
+  } else {
+    object.values["executor_id"] = "";
+  }
+
+  object.values["slave_id"] = task.slave_id().value();
+  object.values["state"] = TaskState_Name(state);
+  object.values["resources"] = model(task.resources());
+
+  JSON::Array array;
+  foreach (const TaskStatus& status, statuses) {
+    array.values.push_back(model(status));
+  }
+  object.values["statuses"] = array;
+
+  return object;
+}
+
+
 }  // namespace internal {
 }  // namespace mesos {

http://git-wip-us.apache.org/repos/asf/mesos/blob/85303d17/src/common/http.hpp
----------------------------------------------------------------------
diff --git a/src/common/http.hpp b/src/common/http.hpp
index 8216401..afce7fe 100644
--- a/src/common/http.hpp
+++ b/src/common/http.hpp
@@ -19,18 +19,33 @@
 #ifndef __COMMON_HTTP_HPP__
 #define __COMMON_HTTP_HPP__
 
+#include <vector>
+
+#include <mesos/mesos.hpp>
+
 #include <stout/json.hpp>
 
 namespace mesos {
+
 class Resources;
 
 namespace internal {
+
 class Attributes;
 class Task;
 
+
 JSON::Object model(const Resources& resources);
 JSON::Object model(const Attributes& attributes);
+
+// These are the two identical ways to model a task, depending on
+// whether you have a 'Task' or a 'TaskInfo' available.
 JSON::Object model(const Task& task);
+JSON::Object model(
+    const TaskInfo& task,
+    const FrameworkID& frameworkId,
+    const TaskState& state,
+    const std::vector<TaskStatus>& statuses);
 
 } // namespace internal {
 } // namespace mesos {

http://git-wip-us.apache.org/repos/asf/mesos/blob/85303d17/src/master/http.cpp
----------------------------------------------------------------------
diff --git a/src/master/http.cpp b/src/master/http.cpp
index 9317a95..6dd11fe 100644
--- a/src/master/http.cpp
+++ b/src/master/http.cpp
@@ -127,6 +127,12 @@ JSON::Object model(const Framework& framework)
   // Model all of the tasks associated with a framework.
   {
     JSON::Array array;
+
+    foreachvalue (const TaskInfo& task, framework.pendingTasks) {
+      vector<TaskStatus> statuses;
+      array.values.push_back(model(task, framework.id, TASK_STAGING, 
statuses));
+    }
+
     foreachvalue (Task* task, framework.tasks) {
       array.values.push_back(model(*task));
     }
@@ -176,6 +182,7 @@ JSON::Object model(const Slave& slave)
   return object;
 }
 
+
 // Returns a JSON object modeled after a Role.
 JSON::Object model(const Role& role)
 {

http://git-wip-us.apache.org/repos/asf/mesos/blob/85303d17/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index 72494b5..f40a1cd 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -2155,6 +2155,9 @@ void Master::launchTasks(
           TASK_LOST,
           "Task launched with invalid offers: " + error.get().message);
 
+      metrics.tasks_lost++;
+      stats.tasks[TASK_LOST]++;
+
       forward(update, UPID(), framework);
     }
     return;
@@ -2177,6 +2180,8 @@ void Master::launchTasks(
     // NOTE: We need to do this here after validation because of the
     // way task validators work.
     framework->pendingTasks[task.task_id()] = task;
+
+    stats.tasks[TASK_STAGING]++;
   }
 
   // Wait for all the tasks to be validated.
@@ -2322,8 +2327,6 @@ void Master::launchTask(
   message.mutable_task()->MergeFrom(task);
   send(slave->pid, message);
 
-  stats.tasks[TASK_STAGING]++;
-
   return;
 }
 
@@ -2361,6 +2364,9 @@ void Master::_launchTasks(
           TASK_LOST,
           (slave == NULL ? "Slave removed" : "Slave disconnected"));
 
+      metrics.tasks_lost++;
+      stats.tasks[TASK_LOST]++;
+
       forward(update, UPID(), framework);
     }
 
@@ -2397,6 +2403,9 @@ void Master::_launchTasks(
           TASK_LOST,
           error);
 
+      metrics.tasks_lost++;
+      stats.tasks[TASK_LOST]++;
+
       forward(update, UPID(), framework);
 
       continue;
@@ -2423,6 +2432,9 @@ void Master::_launchTasks(
           TASK_LOST,
           error);
 
+      metrics.tasks_lost++;
+      stats.tasks[TASK_LOST]++;
+
       forward(update, UPID(), framework);
 
       continue;
@@ -4471,6 +4483,11 @@ double Master::_tasks_staging()
 {
   double count = 0.0;
 
+  // Add the tasks pending validation / authorization.
+  foreachvalue (Framework* framework, frameworks.registered) {
+    count += framework->pendingTasks.size();
+  }
+
   foreachvalue (Slave* slave, slaves.registered) {
     typedef hashmap<TaskID, Task*> TaskMap;
     foreachvalue (const TaskMap& tasks, slave->tasks) {

http://git-wip-us.apache.org/repos/asf/mesos/blob/85303d17/src/tests/common/http_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/common/http_tests.cpp b/src/tests/common/http_tests.cpp
new file mode 100644
index 0000000..5fa51bf
--- /dev/null
+++ b/src/tests/common/http_tests.cpp
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <gtest/gtest.h>
+
+#include <vector>
+
+#include <mesos/mesos.hpp>
+
+#include <stout/gtest.hpp>
+#include <stout/json.hpp>
+#include <stout/stringify.hpp>
+
+#include "common/http.hpp"
+#include "common/protobuf_utils.hpp"
+
+#include "messages/messages.hpp"
+
+using std::vector;
+
+using namespace mesos;
+using namespace mesos::internal;
+
+// TODO(bmahler): Add tests for other JSON models.
+
+// This test ensures we don't break the API when it comes to JSON
+// representation of tasks. Also, we want to ensure that tasks are
+// modeled the same way when using 'Task' vs. 'TaskInfo'.
+TEST(HTTP, ModelTask)
+{
+  TaskID taskId;
+  taskId.set_value("t");
+
+  SlaveID slaveId;
+  slaveId.set_value("s");
+
+  ExecutorID executorId;
+  executorId.set_value("e");
+
+  FrameworkID frameworkId;
+  frameworkId.set_value("f");
+
+  TaskState state = TASK_RUNNING;
+
+  vector<TaskStatus> statuses;
+
+  TaskStatus status;
+  status.mutable_task_id()->CopyFrom(taskId);
+  status.set_state(state);
+  status.mutable_slave_id()->CopyFrom(slaveId);
+  status.mutable_executor_id()->CopyFrom(executorId);
+  status.set_timestamp(0.0);
+
+  statuses.push_back(status);
+
+  TaskInfo task;
+  task.set_name("task");
+  task.mutable_task_id()->CopyFrom(taskId);
+  task.mutable_slave_id()->CopyFrom(slaveId);
+  task.mutable_command()->set_value("echo hello");
+
+  Task task_ = protobuf::createTask(task, state, executorId, frameworkId);
+  task_.add_statuses()->CopyFrom(statuses[0]);
+
+  JSON::Value object = model(task, frameworkId, state, statuses);
+  JSON::Value object_ = model(task_);
+
+  Try<JSON::Value> expected = JSON::parse(
+      "{"
+      "  \"executor_id\":\"\","
+      "  \"framework_id\":\"f\","
+      "  \"id\":\"t\","
+      "  \"name\":\"task\","
+      "  \"resources\":"
+      "  {"
+      "    \"cpus\":0,"
+      "    \"disk\":0,"
+      "    \"mem\":0"
+      "  },"
+      "  \"slave_id\":\"s\","
+      "  \"state\":\"TASK_RUNNING\","
+      "  \"statuses\":"
+      "  ["
+      "    {"
+      "      \"state\":\"TASK_RUNNING\","
+      "      \"timestamp\":0"
+      "    }"
+      "  ]"
+      "}");
+
+  ASSERT_SOME(expected);
+
+  EXPECT_EQ(expected.get(), object);
+  EXPECT_EQ(expected.get(), object_);
+
+  // Ensure both are modeled the same.
+  EXPECT_EQ(object, object_);
+}

Reply via email to