Repository: mesos Updated Branches: refs/heads/master 020763b7a -> 76b746682
Implemented v1 operator API GET_FRAMEWORK call. Review: https://reviews.apache.org/r/49137/ Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/76b74668 Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/76b74668 Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/76b74668 Branch: refs/heads/master Commit: 76b7466829c807f4d8ccb0d9a31c1d030ac64931 Parents: b00f870 Author: zhou xing <xingz...@cn.ibm.com> Authored: Tue Jun 28 12:02:53 2016 -0700 Committer: Vinod Kone <vinodk...@gmail.com> Committed: Tue Jun 28 13:11:14 2016 -0700 ---------------------------------------------------------------------- src/master/http.cpp | 174 ++++++++++++++++++++++++++++++++++++++++++- src/master/master.hpp | 5 ++ src/tests/api_tests.cpp | 46 ++++++++++++ 3 files changed, 224 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/76b74668/src/master/http.cpp ---------------------------------------------------------------------- diff --git a/src/master/http.cpp b/src/master/http.cpp index e827e69..0f9c2b9 100644 --- a/src/master/http.cpp +++ b/src/master/http.cpp @@ -544,7 +544,7 @@ Future<Response> Master::Http::api( return getAgents(call, principal, acceptType); case mesos::master::Call::GET_FRAMEWORKS: - return NotImplemented(); + return getFrameworks(call, principal, acceptType); case mesos::master::Call::GET_TASKS: return getTasks(call, principal, acceptType); @@ -1215,6 +1215,178 @@ Future<Response> Master::Http::frameworks( } +mesos::master::Response::GetFrameworks::Framework model( + const Framework& framework, + const Owned<ObjectApprover>& tasksApprover, + const Owned<ObjectApprover>& executorsApprover) +{ + mesos::master::Response::GetFrameworks::Framework _framework; + + _framework.mutable_framework_info()->CopyFrom(framework.info); + + _framework.set_active(framework.active); + _framework.set_connected(framework.connected); + + int64_t time = framework.registeredTime.duration().ns(); + if (time != 0) { + _framework.mutable_registered_time()->set_nanoseconds(time); + } + + time = framework.unregisteredTime.duration().ns(); + if (time != 0) { + _framework.mutable_unregistered_time()->set_nanoseconds(time); + } + + time = framework.reregisteredTime.duration().ns(); + if (time != 0) { + _framework.mutable_reregistered_time()->set_nanoseconds(time); + } + + foreachvalue (const TaskInfo& taskInfo, framework.pendingTasks) { + // Skip unauthorized tasks. + if (!approveViewTaskInfo(tasksApprover, taskInfo, framework.info)) { + continue; + } + + _framework.mutable_pending_tasks()->Add()->CopyFrom(taskInfo); + } + + foreachvalue (const Task* task, framework.tasks) { + // Skip unauthorized tasks. + if (!approveViewTask(tasksApprover, *task, framework.info)) { + continue; + } + + _framework.mutable_tasks()->Add()->CopyFrom(*task); + } + + foreach (const std::shared_ptr<Task>& task, framework.completedTasks) { + // Skip unauthorized tasks. + if (!approveViewTask(tasksApprover, *task.get(), framework.info)) { + continue; + } + + _framework.mutable_completed_tasks()->Add()->CopyFrom(*task.get()); + } + + foreachpair (const SlaveID& slaveId, + const auto& executorsMap, + framework.executors) { + foreachvalue (const ExecutorInfo& info, executorsMap) { + // Skip unauthorized executors. + if (!approveViewExecutorInfo(executorsApprover, + info, + framework.info)) { + continue; + } + + mesos::master::Response::GetFrameworks::Executor executor; + executor.mutable_info()->CopyFrom(executor); + executor.mutable_slave_id()->set_value(slaveId.value()); + + _framework.mutable_executors()->Add()->CopyFrom(executor); + } + } + + foreach (const Offer* offer, framework.offers) { + _framework.mutable_offers()->Add()->CopyFrom(*offer); + } + + foreach (const Resource& resource, framework.totalUsedResources) { + _framework.mutable_allocated_resources()->Add()->CopyFrom(resource); + } + + foreach (const Resource& resource, framework.totalOfferedResources) { + _framework.mutable_offered_resources()->Add()->CopyFrom(resource); + } + + return _framework; +} + + +Future<Response> Master::Http::getFrameworks( + const mesos::master::Call& call, + const Option<string>& principal, + ContentType contentType) const +{ + CHECK_EQ(mesos::master::Call::GET_FRAMEWORKS, call.type()); + + // Retrieve `ObjectApprover`s for authorizing frameworks and tasks. + Future<Owned<ObjectApprover>> frameworksApprover; + Future<Owned<ObjectApprover>> tasksApprover; + Future<Owned<ObjectApprover>> executorsApprover; + + if (master->authorizer.isSome()) { + authorization::Subject subject; + if (principal.isSome()) { + subject.set_value(principal.get()); + } + + frameworksApprover = master->authorizer.get()->getObjectApprover( + subject, authorization::VIEW_FRAMEWORK); + + tasksApprover = master->authorizer.get()->getObjectApprover( + subject, authorization::VIEW_TASK); + + executorsApprover = master->authorizer.get()->getObjectApprover( + subject, authorization::VIEW_EXECUTOR); + } else { + frameworksApprover = Owned<ObjectApprover>(new AcceptingObjectApprover()); + tasksApprover = Owned<ObjectApprover>(new AcceptingObjectApprover()); + executorsApprover = Owned<ObjectApprover>(new AcceptingObjectApprover()); + } + + return collect(frameworksApprover, tasksApprover, executorsApprover) + .then(defer(master->self(), + [=](const tuple<Owned<ObjectApprover>, + Owned<ObjectApprover>, + Owned<ObjectApprover>>& approvers) -> Response { + // Get approver from tuple. + Owned<ObjectApprover> frameworksApprover; + Owned<ObjectApprover> tasksApprover; + Owned<ObjectApprover> executorsApprover; + tie(frameworksApprover, tasksApprover, executorsApprover) = approvers; + + mesos::master::Response response; + response.set_type(mesos::master::Response::GET_FRAMEWORKS); + + foreachvalue (const Framework* framework, + master->frameworks.registered) { + // Skip unauthorized frameworks. + if (!approveViewFrameworkInfo(frameworksApprover, framework->info)) { + continue; + } + + response.mutable_get_frameworks()->add_frameworks()->CopyFrom( + model(*framework, tasksApprover, executorsApprover)); + } + + foreach (const std::shared_ptr<Framework>& framework, + master->frameworks.completed) { + // Skip unauthorized frameworks. + if (!approveViewFrameworkInfo(frameworksApprover, framework->info)) { + continue; + } + + response.mutable_get_frameworks()->add_completed_frameworks() + ->CopyFrom(model(*framework, tasksApprover, executorsApprover)); + } + + foreachvalue (const Slave* slave, master->slaves.registered) { + foreachkey (const FrameworkID& frameworkId, slave->tasks) { + if (!master->frameworks.registered.contains(frameworkId)) { + response.mutable_get_frameworks()->add_unregistered_frameworks() + ->set_value(frameworkId.value()); + } + } + } + + return OK(serialize(contentType, evolve(response)), + stringify(contentType)); + })); +} + + string Master::Http::FLAGS_HELP() { return HELP( http://git-wip-us.apache.org/repos/asf/mesos/blob/76b74668/src/master/master.hpp ---------------------------------------------------------------------- diff --git a/src/master/master.hpp b/src/master/master.hpp index 5c43f03..111c522 100644 --- a/src/master/master.hpp +++ b/src/master/master.hpp @@ -1457,6 +1457,11 @@ private: const Option<std::string>& principal, ContentType contentType) const; + process::Future<process::http::Response> getFrameworks( + const mesos::master::Call& call, + const Option<std::string>& principal, + ContentType contentType) const; + Master* master; // NOTE: The quota specific pieces of the Operator API are factored http://git-wip-us.apache.org/repos/asf/mesos/blob/76b74668/src/tests/api_tests.cpp ---------------------------------------------------------------------- diff --git a/src/tests/api_tests.cpp b/src/tests/api_tests.cpp index fd51fe9..136a9fb 100644 --- a/src/tests/api_tests.cpp +++ b/src/tests/api_tests.cpp @@ -212,6 +212,52 @@ TEST_P(MasterAPITest, GetFlags) } +TEST_P(MasterAPITest, GetFrameworks) +{ + Try<Owned<cluster::Master>> master = StartMaster(); + ASSERT_SOME(master); + + FrameworkInfo framework = DEFAULT_FRAMEWORK_INFO; + + MockScheduler sched; + MesosSchedulerDriver driver( + &sched, framework, master.get()->pid, DEFAULT_CREDENTIAL); + + Future<Nothing> registered; + EXPECT_CALL(sched, registered(&driver, _, _)) + .WillOnce(FutureSatisfy(®istered)); + + driver.start(); + + AWAIT_READY(registered); + + v1::master::Call v1Call; + v1Call.set_type(v1::master::Call::GET_FRAMEWORKS); + + ContentType contentType = GetParam(); + + Future<v1::master::Response> v1Response = + post(master.get()->pid, v1Call, contentType); + + AWAIT_READY(v1Response); + ASSERT_TRUE(v1Response.get().IsInitialized()); + ASSERT_EQ(v1::master::Response::GET_FRAMEWORKS, v1Response.get().type()); + + v1::master::Response::GetFrameworks frameworks = + v1Response.get().get_frameworks(); + + ASSERT_EQ(1, frameworks.frameworks_size()); + ASSERT_EQ("default", frameworks.frameworks(0).framework_info().name()); + ASSERT_EQ("*", frameworks.frameworks(0).framework_info().role()); + ASSERT_FALSE(frameworks.frameworks(0).framework_info().checkpoint()); + ASSERT_TRUE(frameworks.frameworks(0).active()); + ASSERT_TRUE(frameworks.frameworks(0).connected()); + + driver.stop(); + driver.join(); +} + + TEST_P(MasterAPITest, GetHealth) { Try<Owned<cluster::Master>> master = this->StartMaster();