Repository: mesos Updated Branches: refs/heads/master 28ab19b11 -> b4e210678
Fixed a master API bug for agent re-registration after master failover. When the master fails over and a client subscribes to the master before agent re-registration, the master will crash when sending `TASK_ADDED` because the framework info might not have been added to the master yet. This patch fixes this bug. Review: https://reviews.apache.org/r/65774/ Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/f2ec2b28 Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/f2ec2b28 Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/f2ec2b28 Branch: refs/heads/master Commit: f2ec2b288e823424b2efe71d62ef90101b7a863f Parents: 28ab19b Author: Chun-Hung Hsiao <chhs...@mesosphere.io> Authored: Fri Feb 23 18:37:12 2018 -0800 Committer: Greg Mann <gregorywm...@gmail.com> Committed: Fri Feb 23 18:37:12 2018 -0800 ---------------------------------------------------------------------- src/master/master.cpp | 98 ++++++++++++++++------------------------------ src/master/master.hpp | 15 +++++-- 2 files changed, 46 insertions(+), 67 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/f2ec2b28/src/master/master.cpp ---------------------------------------------------------------------- diff --git a/src/master/master.cpp b/src/master/master.cpp index e7d5ac6..f2c2dbe 100644 --- a/src/master/master.cpp +++ b/src/master/master.cpp @@ -10241,8 +10241,18 @@ void Master::updateTask(Task* task, const StatusUpdate& update) task->mutable_statuses(task->statuses_size() - 1)->clear_data(); if (sendSubscribersUpdate && !subscribers.subscribed.empty()) { - subscribers.send(protobuf::master::event::createTaskUpdated( - *task, task->state(), status)); + // If the framework has been removed, the task would have already + // transitioned to `TASK_KILLED` by `removeFramework()`, thus + // `sendSubscribersUpdate` shouldn't have been set to true. + // TODO(chhsiao): This may be changed after MESOS-6608 is resolved. + Framework* framework = getFramework(task->framework_id()); + CHECK_NOTNULL(framework); + + subscribers.send( + protobuf::master::event::createTaskUpdated( + *task, task->state(), status), + framework->info, + *task); } LOG(INFO) << "Updating the state of task " << task->task_id() @@ -11185,57 +11195,25 @@ static bool isValidFailoverTimeout(const FrameworkInfo& frameworkInfo) } -void Master::Subscribers::send(mesos::master::Event&& event) +void Master::Subscribers::send( + mesos::master::Event&& event, + const Option<FrameworkInfo>& frameworkInfo, + const Option<Task>& task) { VLOG(1) << "Notifying all active subscribers about " << event.type() << " event"; - Option<Shared<FrameworkInfo>> frameworkInfo; - Option<Shared<Task>> task; - - // Copy metadata associated with the event if necessary, so that we capture - // the current state before we make asynchronous calls below. - switch (event.type()) { - case mesos::master::Event::TASK_ADDED: { - Framework* framework = - master->getFramework(event.task_added().task().framework_id()); - - CHECK_NOTNULL(framework); - - frameworkInfo = Shared<FrameworkInfo>(new FrameworkInfo(framework->info)); - break; - } - case mesos::master::Event::TASK_UPDATED: { - Framework* framework = - master->getFramework(event.task_updated().framework_id()); - - CHECK_NOTNULL(framework); - - frameworkInfo = Shared<FrameworkInfo>(new FrameworkInfo(framework->info)); - - Task* storedTask = - framework->getTask(event.task_updated().status().task_id()); - - CHECK_NOTNULL(storedTask); - - task = Shared<Task>(new Task(*storedTask)); - break; - } - case mesos::master::Event::FRAMEWORK_ADDED: - case mesos::master::Event::FRAMEWORK_UPDATED: - case mesos::master::Event::FRAMEWORK_REMOVED: - case mesos::master::Event::AGENT_ADDED: - case mesos::master::Event::AGENT_REMOVED: - case mesos::master::Event::SUBSCRIBED: - case mesos::master::Event::HEARTBEAT: - case mesos::master::Event::UNKNOWN: - break; - } - // Create a single copy of the event for all subscribers to share. Shared<mesos::master::Event> sharedEvent( new mesos::master::Event(std::move(event))); + // Create a single copy of `FrameworkInfo` and `Task` for all + // subscribers to share. + Shared<FrameworkInfo> sharedFrameworkInfo( + frameworkInfo.isSome() + ? new FrameworkInfo(frameworkInfo.get()) : nullptr); + Shared<Task> sharedTask(task.isSome() ? new Task(task.get()) : nullptr); + foreachvalue (const Owned<Subscriber>& subscriber, subscribed) { Future<Owned<AuthorizationAcceptor>> authorizeRole = AuthorizationAcceptor::create( @@ -11283,8 +11261,8 @@ void Master::Subscribers::send(mesos::master::Event&& event) authorizeFramework, authorizeTask, authorizeExecutor, - frameworkInfo, - task); + sharedFrameworkInfo, + sharedTask); return Nothing(); })); @@ -11298,30 +11276,26 @@ void Master::Subscribers::Subscriber::send( const Owned<AuthorizationAcceptor>& authorizeFramework, const Owned<AuthorizationAcceptor>& authorizeTask, const Owned<AuthorizationAcceptor>& authorizeExecutor, - const Option<Shared<FrameworkInfo>>& frameworkInfo, - const Option<Shared<Task>>& task) + const Shared<FrameworkInfo>& frameworkInfo, + const Shared<Task>& task) { switch (event->type()) { case mesos::master::Event::TASK_ADDED: { - CHECK_SOME(frameworkInfo); - CHECK_NOTNULL(&frameworkInfo.get()); + CHECK_NOTNULL(frameworkInfo.get()); if (authorizeTask->accept( - event->task_added().task(), *frameworkInfo.get()) && - authorizeFramework->accept(*frameworkInfo.get())) { + event->task_added().task(), *frameworkInfo) && + authorizeFramework->accept(*frameworkInfo)) { http.send<mesos::master::Event, v1::master::Event>(*event); } break; } case mesos::master::Event::TASK_UPDATED: { - CHECK_SOME(frameworkInfo); - CHECK_NOTNULL(&frameworkInfo.get()); + CHECK_NOTNULL(frameworkInfo.get()); + CHECK_NOTNULL(task.get()); - CHECK_SOME(task); - CHECK_NOTNULL(&task.get()); - - if (authorizeTask->accept(*task.get(), *frameworkInfo.get()) && - authorizeFramework->accept(*frameworkInfo.get())) { + if (authorizeTask->accept(*task, *frameworkInfo) && + authorizeFramework->accept(*frameworkInfo)) { http.send<mesos::master::Event, v1::master::Event>(*event); } break; @@ -11553,10 +11527,6 @@ void Slave::addTask(Task* task) usedResources[frameworkId] += resources; } - if (!master->subscribers.subscribed.empty()) { - master->subscribers.send(protobuf::master::event::createTaskAdded(*task)); - } - // Note that we use `Resources` for output as it's faster than // logging raw protobuf data. LOG(INFO) << "Adding task " << taskId http://git-wip-us.apache.org/repos/asf/mesos/blob/f2ec2b28/src/master/master.hpp ---------------------------------------------------------------------- diff --git a/src/master/master.hpp b/src/master/master.hpp index 6569c9e..1fadbe6 100644 --- a/src/master/master.hpp +++ b/src/master/master.hpp @@ -2004,8 +2004,8 @@ private: const process::Owned<AuthorizationAcceptor>& authorizeFramework, const process::Owned<AuthorizationAcceptor>& authorizeTask, const process::Owned<AuthorizationAcceptor>& authorizeExecutor, - const Option<process::Shared<FrameworkInfo>>& frameworkInfo, - const Option<process::Shared<Task>>& task); + const process::Shared<FrameworkInfo>& frameworkInfo, + const process::Shared<Task>& task); ~Subscriber() { @@ -2026,7 +2026,10 @@ private: }; // Sends the event to all subscribers connected to the 'api/vX' endpoint. - void send(mesos::master::Event&& event); + void send( + mesos::master::Event&& event, + const Option<FrameworkInfo>& frameworkInfo = None(), + const Option<Task>& task = None()); Master* master; @@ -2274,6 +2277,12 @@ struct Framework trackUnderRole(role); } } + + if (!master->subscribers.subscribed.empty()) { + master->subscribers.send( + protobuf::master::event::createTaskAdded(*task), + info); + } } // Update framework to recover the resources that were previously