Model pending tasks in the Master's metrics and JSON. Review: https://reviews.apache.org/r/24515
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/85303d17 Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/85303d17 Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/85303d17 Branch: refs/heads/master Commit: 85303d17e264fdceb9dd814018384e8759aa93b7 Parents: 3ff1180 Author: Benjamin Mahler <bmah...@twitter.com> Authored: Fri Aug 8 15:46:08 2014 -0700 Committer: Benjamin Mahler <bmah...@twitter.com> Committed: Wed Aug 13 11:54:23 2014 -0700 ---------------------------------------------------------------------- src/Makefile.am | 3 +- src/common/http.cpp | 54 +++++++++++++---- src/common/http.hpp | 15 +++++ src/master/http.cpp | 7 +++ src/master/master.cpp | 21 ++++++- src/tests/common/http_tests.cpp | 113 +++++++++++++++++++++++++++++++++++ 6 files changed, 200 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/85303d17/src/Makefile.am ---------------------------------------------------------------------- diff --git a/src/Makefile.am b/src/Makefile.am index 60f89ed..0ac95b4 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -1099,7 +1099,8 @@ mesos_tests_SOURCES = \ tests/status_update_manager_tests.cpp \ tests/utils.cpp \ tests/values_tests.cpp \ - tests/zookeeper_url_tests.cpp + tests/zookeeper_url_tests.cpp \ + tests/common/http_tests.cpp mesos_tests_CPPFLAGS = $(MESOS_CPPFLAGS) mesos_tests_CPPFLAGS += -DSOURCE_DIR=\"$(abs_top_srcdir)\" http://git-wip-us.apache.org/repos/asf/mesos/blob/85303d17/src/common/http.cpp ---------------------------------------------------------------------- diff --git a/src/common/http.cpp b/src/common/http.cpp index d27fe21..58050e9 100644 --- a/src/common/http.cpp +++ b/src/common/http.cpp @@ -16,12 +16,8 @@ * limitations under the License. */ -#include <map> -#include <string> +#include <vector> -#include <glog/logging.h> - -#include <mesos/mesos.hpp> #include <mesos/resources.hpp> #include <stout/foreach.hpp> @@ -32,14 +28,13 @@ #include "messages/messages.hpp" -using std::map; -using std::string; +using std::vector; namespace mesos { namespace internal { -// TODO(bmahler): Kill these in favor of automatic Proto->JSON Conversion (when -// it becomes available). +// TODO(bmahler): Kill these in favor of automatic Proto->JSON +// Conversion (when it becomes available). JSON::Object model(const Resources& resources) { @@ -118,7 +113,13 @@ JSON::Object model(const Task& task) object.values["id"] = task.task_id().value(); object.values["name"] = task.name(); object.values["framework_id"] = task.framework_id().value(); - object.values["executor_id"] = task.executor_id().value(); + + if (task.has_executor_id()) { + object.values["executor_id"] = task.executor_id().value(); + } else { + object.values["executor_id"] = ""; + } + object.values["slave_id"] = task.slave_id().value(); object.values["state"] = TaskState_Name(task.state()); object.values["resources"] = model(task.resources()); @@ -132,5 +133,38 @@ JSON::Object model(const Task& task) return object; } + +// TODO(bmahler): Expose the executor name / source. +JSON::Object model( + const TaskInfo& task, + const FrameworkID& frameworkId, + const TaskState& state, + const vector<TaskStatus>& statuses) +{ + JSON::Object object; + object.values["id"] = task.task_id().value(); + object.values["name"] = task.name(); + object.values["framework_id"] = frameworkId.value(); + + if (task.has_executor()) { + object.values["executor_id"] = task.executor().executor_id().value(); + } else { + object.values["executor_id"] = ""; + } + + object.values["slave_id"] = task.slave_id().value(); + object.values["state"] = TaskState_Name(state); + object.values["resources"] = model(task.resources()); + + JSON::Array array; + foreach (const TaskStatus& status, statuses) { + array.values.push_back(model(status)); + } + object.values["statuses"] = array; + + return object; +} + + } // namespace internal { } // namespace mesos { http://git-wip-us.apache.org/repos/asf/mesos/blob/85303d17/src/common/http.hpp ---------------------------------------------------------------------- diff --git a/src/common/http.hpp b/src/common/http.hpp index 8216401..afce7fe 100644 --- a/src/common/http.hpp +++ b/src/common/http.hpp @@ -19,18 +19,33 @@ #ifndef __COMMON_HTTP_HPP__ #define __COMMON_HTTP_HPP__ +#include <vector> + +#include <mesos/mesos.hpp> + #include <stout/json.hpp> namespace mesos { + class Resources; namespace internal { + class Attributes; class Task; + JSON::Object model(const Resources& resources); JSON::Object model(const Attributes& attributes); + +// These are the two identical ways to model a task, depending on +// whether you have a 'Task' or a 'TaskInfo' available. JSON::Object model(const Task& task); +JSON::Object model( + const TaskInfo& task, + const FrameworkID& frameworkId, + const TaskState& state, + const std::vector<TaskStatus>& statuses); } // namespace internal { } // namespace mesos { http://git-wip-us.apache.org/repos/asf/mesos/blob/85303d17/src/master/http.cpp ---------------------------------------------------------------------- diff --git a/src/master/http.cpp b/src/master/http.cpp index 9317a95..6dd11fe 100644 --- a/src/master/http.cpp +++ b/src/master/http.cpp @@ -127,6 +127,12 @@ JSON::Object model(const Framework& framework) // Model all of the tasks associated with a framework. { JSON::Array array; + + foreachvalue (const TaskInfo& task, framework.pendingTasks) { + vector<TaskStatus> statuses; + array.values.push_back(model(task, framework.id, TASK_STAGING, statuses)); + } + foreachvalue (Task* task, framework.tasks) { array.values.push_back(model(*task)); } @@ -176,6 +182,7 @@ JSON::Object model(const Slave& slave) return object; } + // Returns a JSON object modeled after a Role. JSON::Object model(const Role& role) { http://git-wip-us.apache.org/repos/asf/mesos/blob/85303d17/src/master/master.cpp ---------------------------------------------------------------------- diff --git a/src/master/master.cpp b/src/master/master.cpp index 72494b5..f40a1cd 100644 --- a/src/master/master.cpp +++ b/src/master/master.cpp @@ -2155,6 +2155,9 @@ void Master::launchTasks( TASK_LOST, "Task launched with invalid offers: " + error.get().message); + metrics.tasks_lost++; + stats.tasks[TASK_LOST]++; + forward(update, UPID(), framework); } return; @@ -2177,6 +2180,8 @@ void Master::launchTasks( // NOTE: We need to do this here after validation because of the // way task validators work. framework->pendingTasks[task.task_id()] = task; + + stats.tasks[TASK_STAGING]++; } // Wait for all the tasks to be validated. @@ -2322,8 +2327,6 @@ void Master::launchTask( message.mutable_task()->MergeFrom(task); send(slave->pid, message); - stats.tasks[TASK_STAGING]++; - return; } @@ -2361,6 +2364,9 @@ void Master::_launchTasks( TASK_LOST, (slave == NULL ? "Slave removed" : "Slave disconnected")); + metrics.tasks_lost++; + stats.tasks[TASK_LOST]++; + forward(update, UPID(), framework); } @@ -2397,6 +2403,9 @@ void Master::_launchTasks( TASK_LOST, error); + metrics.tasks_lost++; + stats.tasks[TASK_LOST]++; + forward(update, UPID(), framework); continue; @@ -2423,6 +2432,9 @@ void Master::_launchTasks( TASK_LOST, error); + metrics.tasks_lost++; + stats.tasks[TASK_LOST]++; + forward(update, UPID(), framework); continue; @@ -4471,6 +4483,11 @@ double Master::_tasks_staging() { double count = 0.0; + // Add the tasks pending validation / authorization. + foreachvalue (Framework* framework, frameworks.registered) { + count += framework->pendingTasks.size(); + } + foreachvalue (Slave* slave, slaves.registered) { typedef hashmap<TaskID, Task*> TaskMap; foreachvalue (const TaskMap& tasks, slave->tasks) { http://git-wip-us.apache.org/repos/asf/mesos/blob/85303d17/src/tests/common/http_tests.cpp ---------------------------------------------------------------------- diff --git a/src/tests/common/http_tests.cpp b/src/tests/common/http_tests.cpp new file mode 100644 index 0000000..5fa51bf --- /dev/null +++ b/src/tests/common/http_tests.cpp @@ -0,0 +1,113 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <gtest/gtest.h> + +#include <vector> + +#include <mesos/mesos.hpp> + +#include <stout/gtest.hpp> +#include <stout/json.hpp> +#include <stout/stringify.hpp> + +#include "common/http.hpp" +#include "common/protobuf_utils.hpp" + +#include "messages/messages.hpp" + +using std::vector; + +using namespace mesos; +using namespace mesos::internal; + +// TODO(bmahler): Add tests for other JSON models. + +// This test ensures we don't break the API when it comes to JSON +// representation of tasks. Also, we want to ensure that tasks are +// modeled the same way when using 'Task' vs. 'TaskInfo'. +TEST(HTTP, ModelTask) +{ + TaskID taskId; + taskId.set_value("t"); + + SlaveID slaveId; + slaveId.set_value("s"); + + ExecutorID executorId; + executorId.set_value("e"); + + FrameworkID frameworkId; + frameworkId.set_value("f"); + + TaskState state = TASK_RUNNING; + + vector<TaskStatus> statuses; + + TaskStatus status; + status.mutable_task_id()->CopyFrom(taskId); + status.set_state(state); + status.mutable_slave_id()->CopyFrom(slaveId); + status.mutable_executor_id()->CopyFrom(executorId); + status.set_timestamp(0.0); + + statuses.push_back(status); + + TaskInfo task; + task.set_name("task"); + task.mutable_task_id()->CopyFrom(taskId); + task.mutable_slave_id()->CopyFrom(slaveId); + task.mutable_command()->set_value("echo hello"); + + Task task_ = protobuf::createTask(task, state, executorId, frameworkId); + task_.add_statuses()->CopyFrom(statuses[0]); + + JSON::Value object = model(task, frameworkId, state, statuses); + JSON::Value object_ = model(task_); + + Try<JSON::Value> expected = JSON::parse( + "{" + " \"executor_id\":\"\"," + " \"framework_id\":\"f\"," + " \"id\":\"t\"," + " \"name\":\"task\"," + " \"resources\":" + " {" + " \"cpus\":0," + " \"disk\":0," + " \"mem\":0" + " }," + " \"slave_id\":\"s\"," + " \"state\":\"TASK_RUNNING\"," + " \"statuses\":" + " [" + " {" + " \"state\":\"TASK_RUNNING\"," + " \"timestamp\":0" + " }" + " ]" + "}"); + + ASSERT_SOME(expected); + + EXPECT_EQ(expected.get(), object); + EXPECT_EQ(expected.get(), object_); + + // Ensure both are modeled the same. + EXPECT_EQ(object, object_); +}