Repository: mesos
Updated Branches:
  refs/heads/master 28ab19b11 -> b4e210678


Fixed a master API bug for agent re-registration after master failover.

When the master fails over and a client subscribes to the master before
agent re-registration, the master will crash when sending `TASK_ADDED`
because the framework info might not have been added to the master yet.
This patch fixes this bug.

Review: https://reviews.apache.org/r/65774/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/f2ec2b28
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/f2ec2b28
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/f2ec2b28

Branch: refs/heads/master
Commit: f2ec2b288e823424b2efe71d62ef90101b7a863f
Parents: 28ab19b
Author: Chun-Hung Hsiao <chhs...@mesosphere.io>
Authored: Fri Feb 23 18:37:12 2018 -0800
Committer: Greg Mann <gregorywm...@gmail.com>
Committed: Fri Feb 23 18:37:12 2018 -0800

----------------------------------------------------------------------
 src/master/master.cpp | 98 ++++++++++++++++------------------------------
 src/master/master.hpp | 15 +++++--
 2 files changed, 46 insertions(+), 67 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/f2ec2b28/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index e7d5ac6..f2c2dbe 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -10241,8 +10241,18 @@ void Master::updateTask(Task* task, const 
StatusUpdate& update)
   task->mutable_statuses(task->statuses_size() - 1)->clear_data();
 
   if (sendSubscribersUpdate && !subscribers.subscribed.empty()) {
-    subscribers.send(protobuf::master::event::createTaskUpdated(
-        *task, task->state(), status));
+    // If the framework has been removed, the task would have already
+    // transitioned to `TASK_KILLED` by `removeFramework()`, thus
+    // `sendSubscribersUpdate` shouldn't have been set to true.
+    // TODO(chhsiao): This may be changed after MESOS-6608 is resolved.
+    Framework* framework = getFramework(task->framework_id());
+    CHECK_NOTNULL(framework);
+
+    subscribers.send(
+        protobuf::master::event::createTaskUpdated(
+            *task, task->state(), status),
+        framework->info,
+        *task);
   }
 
   LOG(INFO) << "Updating the state of task " << task->task_id()
@@ -11185,57 +11195,25 @@ static bool isValidFailoverTimeout(const 
FrameworkInfo& frameworkInfo)
 }
 
 
-void Master::Subscribers::send(mesos::master::Event&& event)
+void Master::Subscribers::send(
+    mesos::master::Event&& event,
+    const Option<FrameworkInfo>& frameworkInfo,
+    const Option<Task>& task)
 {
   VLOG(1) << "Notifying all active subscribers about " << event.type()
           << " event";
 
-  Option<Shared<FrameworkInfo>> frameworkInfo;
-  Option<Shared<Task>> task;
-
-  // Copy metadata associated with the event if necessary, so that we capture
-  // the current state before we make asynchronous calls below.
-  switch (event.type()) {
-    case mesos::master::Event::TASK_ADDED: {
-      Framework* framework =
-        master->getFramework(event.task_added().task().framework_id());
-
-      CHECK_NOTNULL(framework);
-
-      frameworkInfo = Shared<FrameworkInfo>(new 
FrameworkInfo(framework->info));
-      break;
-    }
-    case mesos::master::Event::TASK_UPDATED: {
-      Framework* framework =
-        master->getFramework(event.task_updated().framework_id());
-
-      CHECK_NOTNULL(framework);
-
-      frameworkInfo = Shared<FrameworkInfo>(new 
FrameworkInfo(framework->info));
-
-      Task* storedTask =
-        framework->getTask(event.task_updated().status().task_id());
-
-      CHECK_NOTNULL(storedTask);
-
-      task = Shared<Task>(new Task(*storedTask));
-      break;
-    }
-    case mesos::master::Event::FRAMEWORK_ADDED:
-    case mesos::master::Event::FRAMEWORK_UPDATED:
-    case mesos::master::Event::FRAMEWORK_REMOVED:
-    case mesos::master::Event::AGENT_ADDED:
-    case mesos::master::Event::AGENT_REMOVED:
-    case mesos::master::Event::SUBSCRIBED:
-    case mesos::master::Event::HEARTBEAT:
-    case mesos::master::Event::UNKNOWN:
-      break;
-  }
-
   // Create a single copy of the event for all subscribers to share.
   Shared<mesos::master::Event> sharedEvent(
       new mesos::master::Event(std::move(event)));
 
+  // Create a single copy of `FrameworkInfo` and `Task` for all
+  // subscribers to share.
+  Shared<FrameworkInfo> sharedFrameworkInfo(
+      frameworkInfo.isSome()
+        ? new FrameworkInfo(frameworkInfo.get()) : nullptr);
+  Shared<Task> sharedTask(task.isSome() ? new Task(task.get()) : nullptr);
+
   foreachvalue (const Owned<Subscriber>& subscriber, subscribed) {
     Future<Owned<AuthorizationAcceptor>> authorizeRole =
       AuthorizationAcceptor::create(
@@ -11283,8 +11261,8 @@ void Master::Subscribers::send(mesos::master::Event&& 
event)
             authorizeFramework,
             authorizeTask,
             authorizeExecutor,
-            frameworkInfo,
-            task);
+            sharedFrameworkInfo,
+            sharedTask);
 
         return Nothing();
       }));
@@ -11298,30 +11276,26 @@ void Master::Subscribers::Subscriber::send(
     const Owned<AuthorizationAcceptor>& authorizeFramework,
     const Owned<AuthorizationAcceptor>& authorizeTask,
     const Owned<AuthorizationAcceptor>& authorizeExecutor,
-    const Option<Shared<FrameworkInfo>>& frameworkInfo,
-    const Option<Shared<Task>>& task)
+    const Shared<FrameworkInfo>& frameworkInfo,
+    const Shared<Task>& task)
 {
   switch (event->type()) {
     case mesos::master::Event::TASK_ADDED: {
-      CHECK_SOME(frameworkInfo);
-      CHECK_NOTNULL(&frameworkInfo.get());
+      CHECK_NOTNULL(frameworkInfo.get());
 
       if (authorizeTask->accept(
-              event->task_added().task(), *frameworkInfo.get()) &&
-          authorizeFramework->accept(*frameworkInfo.get())) {
+              event->task_added().task(), *frameworkInfo) &&
+          authorizeFramework->accept(*frameworkInfo)) {
         http.send<mesos::master::Event, v1::master::Event>(*event);
       }
       break;
     }
     case mesos::master::Event::TASK_UPDATED: {
-      CHECK_SOME(frameworkInfo);
-      CHECK_NOTNULL(&frameworkInfo.get());
+      CHECK_NOTNULL(frameworkInfo.get());
+      CHECK_NOTNULL(task.get());
 
-      CHECK_SOME(task);
-      CHECK_NOTNULL(&task.get());
-
-      if (authorizeTask->accept(*task.get(), *frameworkInfo.get()) &&
-          authorizeFramework->accept(*frameworkInfo.get())) {
+      if (authorizeTask->accept(*task, *frameworkInfo) &&
+          authorizeFramework->accept(*frameworkInfo)) {
         http.send<mesos::master::Event, v1::master::Event>(*event);
       }
       break;
@@ -11553,10 +11527,6 @@ void Slave::addTask(Task* task)
     usedResources[frameworkId] += resources;
   }
 
-  if (!master->subscribers.subscribed.empty()) {
-    master->subscribers.send(protobuf::master::event::createTaskAdded(*task));
-  }
-
   // Note that we use `Resources` for output as it's faster than
   // logging raw protobuf data.
   LOG(INFO) << "Adding task " << taskId

http://git-wip-us.apache.org/repos/asf/mesos/blob/f2ec2b28/src/master/master.hpp
----------------------------------------------------------------------
diff --git a/src/master/master.hpp b/src/master/master.hpp
index 6569c9e..1fadbe6 100644
--- a/src/master/master.hpp
+++ b/src/master/master.hpp
@@ -2004,8 +2004,8 @@ private:
           const process::Owned<AuthorizationAcceptor>& authorizeFramework,
           const process::Owned<AuthorizationAcceptor>& authorizeTask,
           const process::Owned<AuthorizationAcceptor>& authorizeExecutor,
-          const Option<process::Shared<FrameworkInfo>>& frameworkInfo,
-          const Option<process::Shared<Task>>& task);
+          const process::Shared<FrameworkInfo>& frameworkInfo,
+          const process::Shared<Task>& task);
 
       ~Subscriber()
       {
@@ -2026,7 +2026,10 @@ private:
     };
 
     // Sends the event to all subscribers connected to the 'api/vX' endpoint.
-    void send(mesos::master::Event&& event);
+    void send(
+        mesos::master::Event&& event,
+        const Option<FrameworkInfo>& frameworkInfo = None(),
+        const Option<Task>& task = None());
 
     Master* master;
 
@@ -2274,6 +2277,12 @@ struct Framework
         trackUnderRole(role);
       }
     }
+
+    if (!master->subscribers.subscribed.empty()) {
+      master->subscribers.send(
+          protobuf::master::event::createTaskAdded(*task),
+          info);
+    }
   }
 
   // Update framework to recover the resources that were previously

Reply via email to