Added registrar operations for marking agents (un-)reachable.

Review: https://reviews.apache.org/r/50701/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/af496f3a
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/af496f3a
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/af496f3a

Branch: refs/heads/master
Commit: af496f3a80da9a8e7961fb62f839aacf1658222e
Parents: 5405914
Author: Neil Conway <neil.con...@gmail.com>
Authored: Fri Aug 26 14:48:07 2016 -0700
Committer: Vinod Kone <vinodk...@gmail.com>
Committed: Fri Aug 26 14:48:07 2016 -0700

----------------------------------------------------------------------
 src/master/master.hpp         | 118 ++++++++++++++++++++++++++++++++++++-
 src/tests/registrar_tests.cpp |  95 ++++++++++++++++++++---------
 2 files changed, 183 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/af496f3a/src/master/master.hpp
----------------------------------------------------------------------
diff --git a/src/master/master.hpp b/src/master/master.hpp
index 6decff6..b118293 100644
--- a/src/master/master.hpp
+++ b/src/master/master.hpp
@@ -1891,7 +1891,7 @@ private:
 };
 
 
-// Implementation of slave admission Registrar operation.
+// Add a new slave to the list of admitted slaves.
 class AdmitSlave : public Operation
 {
 public:
@@ -1906,7 +1906,7 @@ protected:
       hashset<SlaveID>* slaveIDs,
       bool strict)
   {
-    // Check and see if this slave already exists.
+    // Check and see if this slave is currently admitted.
     if (slaveIDs->contains(info.id())) {
       if (strict) {
         return Error("Agent already admitted");
@@ -1915,6 +1915,9 @@ protected:
       }
     }
 
+    // TODO(neilc): Check if the slave appears in the list of
+    // `unreachable` slaves in the registry?
+
     Registry::Slave* slave = registry->mutable_slaves()->add_slaves();
     slave->mutable_info()->CopyFrom(info);
     slaveIDs->insert(info.id());
@@ -1926,6 +1929,117 @@ private:
 };
 
 
+// Move a slave from the list of admitted slaves to the list of
+// unreachable slaves.
+class MarkSlaveUnreachable : public Operation
+{
+public:
+  explicit MarkSlaveUnreachable(const SlaveInfo& _info) : info(_info) {
+    CHECK(info.has_id()) << "SlaveInfo is missing the 'id' field";
+  }
+
+protected:
+  virtual Try<bool> perform(
+      Registry* registry,
+      hashset<SlaveID>* slaveIDs,
+      bool strict)
+  {
+    // As currently implemented, this should not be possible: the
+    // master will only mark slaves unreachable that are currently
+    // admitted.
+    if (!slaveIDs->contains(info.id())) {
+      return Error("Agent not yet admitted");
+    }
+
+    for (int i = 0; i < registry->slaves().slaves().size(); i++) {
+      const Registry::Slave& slave = registry->slaves().slaves(i);
+
+      if (slave.info().id() == info.id()) {
+        registry->mutable_slaves()->mutable_slaves()->DeleteSubrange(i, 1);
+        slaveIDs->erase(info.id());
+
+        Registry::UnreachableSlave* unreachable =
+          registry->mutable_unreachable()->add_slaves();
+
+        unreachable->mutable_id()->CopyFrom(info.id());
+        unreachable->mutable_timestamp()->CopyFrom(protobuf::getCurrentTime());
+
+        return true; // Mutation.
+      }
+    }
+
+    // Should not happen.
+    return Error("Failed to find agent " + stringify(info.id()));
+  }
+
+private:
+  const SlaveInfo info;
+};
+
+
+// Add a slave back to the list of admitted slaves. The slave will
+// typically be in the "unreachable" list; if so, it is removed from
+// that list. The slave might also be in the "admitted" list already.
+// Finally, the slave might be in neither the "unreachable" or
+// "admitted" lists, if its metadata has been garbage collected from
+// the registry.
+class MarkSlaveReachable : public Operation
+{
+public:
+  explicit MarkSlaveReachable(const SlaveInfo& _info) : info(_info) {
+    CHECK(info.has_id()) << "SlaveInfo is missing the 'id' field";
+  }
+
+protected:
+  virtual Try<bool> perform(
+      Registry* registry,
+      hashset<SlaveID>* slaveIDs,
+      bool strict)
+  {
+    // A slave might try to reregister that appears in the list of
+    // admitted slaves. This can occur when the master fails over:
+    // agents will usually attempt to reregister with the new master
+    // before they are marked unreachable. In this situation, the
+    // registry is already in the correct state, so no changes are
+    // needed.
+    if (slaveIDs->contains(info.id())) {
+      return false; // No mutation.
+    }
+
+    // Check whether the slave is in the unreachable list.
+    // TODO(neilc): Optimize this to avoid linear scan.
+    bool found = false;
+    for (int i = 0; i < registry->unreachable().slaves().size(); i++) {
+      const Registry::UnreachableSlave& slave =
+        registry->unreachable().slaves(i);
+
+      if (slave.id() == info.id()) {
+        registry->mutable_unreachable()->mutable_slaves()->DeleteSubrange(i, 
1);
+        found = true;
+        break;
+      }
+    }
+
+    if (!found) {
+      LOG(WARNING) << "Allowing UNKNOWN agent to reregister: " << info;
+    }
+
+    // Add the slave to the admitted list, even if we didn't find it
+    // in the unreachable list. This accounts for when the slave was
+    // unreachable for a long time, was GC'd from the unreachable
+    // list, but then eventually reregistered.
+    Registry::Slave* slave = registry->mutable_slaves()->add_slaves();
+    slave->mutable_info()->CopyFrom(info);
+    slaveIDs->insert(info.id());
+
+    return true; // Mutation.
+  }
+
+private:
+  const SlaveInfo info;
+};
+
+
 // Implementation of slave readmission Registrar operation.
 class ReadmitSlave : public Operation
 {

http://git-wip-us.apache.org/repos/asf/mesos/blob/af496f3a/src/tests/registrar_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/registrar_tests.cpp b/src/tests/registrar_tests.cpp
index 9a71d8f..b04fc92 100644
--- a/src/tests/registrar_tests.cpp
+++ b/src/tests/registrar_tests.cpp
@@ -236,7 +236,9 @@ TEST_P(RegistrarTest, Recover)
   AWAIT_EXPECT_FAILED(
       registrar.apply(Owned<Operation>(new AdmitSlave(slave))));
   AWAIT_EXPECT_FAILED(
-      registrar.apply(Owned<Operation>(new ReadmitSlave(slave))));
+      registrar.apply(Owned<Operation>(new MarkSlaveUnreachable(slave))));
+  AWAIT_EXPECT_FAILED(
+      registrar.apply(Owned<Operation>(new MarkSlaveReachable(slave))));
   AWAIT_EXPECT_FAILED(
       registrar.apply(Owned<Operation>(new RemoveSlave(slave))));
 
@@ -246,8 +248,10 @@ TEST_P(RegistrarTest, Recover)
   // operations to ensure they do not fail.
   Future<bool> admit = registrar.apply(
       Owned<Operation>(new AdmitSlave(slave)));
-  Future<bool> readmit = registrar.apply(
-      Owned<Operation>(new ReadmitSlave(slave)));
+  Future<bool> unreachable = registrar.apply(
+      Owned<Operation>(new MarkSlaveUnreachable(slave)));
+  Future<bool> reachable = registrar.apply(
+      Owned<Operation>(new MarkSlaveReachable(slave)));
   Future<bool> remove = registrar.apply(
       Owned<Operation>(new RemoveSlave(slave)));
 
@@ -255,7 +259,8 @@ TEST_P(RegistrarTest, Recover)
   EXPECT_EQ(master, registry.get().master().info());
 
   AWAIT_TRUE(admit);
-  AWAIT_TRUE(readmit);
+  AWAIT_TRUE(unreachable);
+  AWAIT_TRUE(reachable);
   AWAIT_TRUE(remove);
 }
 
@@ -275,7 +280,7 @@ TEST_P(RegistrarTest, Admit)
 }
 
 
-TEST_P(RegistrarTest, Readmit)
+TEST_P(RegistrarTest, MarkReachable)
 {
   Registrar registrar(flags, state);
   AWAIT_READY(registrar.recover(master));
@@ -296,13 +301,51 @@ TEST_P(RegistrarTest, Readmit)
 
   AWAIT_TRUE(registrar.apply(Owned<Operation>(new AdmitSlave(info1))));
 
-  AWAIT_TRUE(registrar.apply(Owned<Operation>(new ReadmitSlave(info1))));
+  // Attempting to mark a slave as reachable that is already reachable
+  // should not result in an error.
+  AWAIT_TRUE(registrar.apply(Owned<Operation>(new MarkSlaveReachable(info1))));
+  AWAIT_TRUE(registrar.apply(Owned<Operation>(new MarkSlaveReachable(info1))));
 
-  if (flags.registry_strict) {
-    AWAIT_FALSE(registrar.apply(Owned<Operation>(new ReadmitSlave(info2))));
-  } else {
-    AWAIT_TRUE(registrar.apply(Owned<Operation>(new ReadmitSlave(info2))));
-  }
+  // Attempting to mark a slave as reachable that is not in the
+  // unreachable list should not result in error.
+  AWAIT_TRUE(registrar.apply(Owned<Operation>(new MarkSlaveReachable(info2))));
+}
+
+
+TEST_P(RegistrarTest, MarkUnreachable)
+{
+  Registrar registrar(flags, state);
+  AWAIT_READY(registrar.recover(master));
+
+  SlaveInfo info1;
+  info1.set_hostname("localhost");
+
+  SlaveID id1;
+  id1.set_value("1");
+  info1.mutable_id()->CopyFrom(id1);
+
+  SlaveID id2;
+  id2.set_value("2");
+
+  SlaveInfo info2;
+  info2.set_hostname("localhost");
+  info2.mutable_id()->CopyFrom(id2);
+
+  AWAIT_TRUE(registrar.apply(Owned<Operation>(new AdmitSlave(info1))));
+
+  AWAIT_TRUE(
+      registrar.apply(Owned<Operation>(new MarkSlaveUnreachable(info1))));
+
+  AWAIT_TRUE(
+      registrar.apply(Owned<Operation>(new MarkSlaveReachable(info1))));
+
+  AWAIT_TRUE(
+      registrar.apply(Owned<Operation>(new MarkSlaveUnreachable(info1))));
+
+  // If a slave is already unreachable, trying to mark it unreachable
+  // again should fail.
+  AWAIT_FALSE(
+      registrar.apply(Owned<Operation>(new MarkSlaveUnreachable(info1))));
 }
 
 
@@ -977,20 +1020,17 @@ TEST_P(RegistrarTest, UpdateWeights)
 
 TEST_P(RegistrarTest, Bootstrap)
 {
-  // Run 1 readmits a slave that is not present.
+  // Run 1 simulates the reregistration of a slave that is not present
+  // in the registry.
   {
     Registrar registrar(flags, state);
     AWAIT_READY(registrar.recover(master));
 
-    // If not strict, we should be allowed to readmit the slave.
-    if (flags.registry_strict) {
-      AWAIT_FALSE(registrar.apply(Owned<Operation>(new ReadmitSlave(slave))));
-    } else {
-      AWAIT_TRUE(registrar.apply(Owned<Operation>(new ReadmitSlave(slave))));
-    }
+    AWAIT_TRUE(
+        registrar.apply(Owned<Operation>(new MarkSlaveReachable(slave))));
   }
 
-  // Run 2 should see the slave if not strict.
+  // Run 2 should see the slave.
   {
     Registrar registrar(flags, state);
 
@@ -998,12 +1038,8 @@ TEST_P(RegistrarTest, Bootstrap)
 
     AWAIT_READY(registry);
 
-    if (flags.registry_strict) {
-      EXPECT_EQ(0, registry.get().slaves().slaves().size());
-    } else {
-      ASSERT_EQ(1, registry.get().slaves().slaves().size());
-      EXPECT_EQ(slave, registry.get().slaves().slaves(0).info());
-    }
+    ASSERT_EQ(1, registry.get().slaves().slaves().size());
+    EXPECT_EQ(slave, registry.get().slaves().slaves(0).info());
   }
 }
 
@@ -1206,13 +1242,16 @@ TEST_P(Registrar_BENCHMARK_Test, Performance)
   // same as in production).
   std::random_shuffle(infos.begin(), infos.end());
 
-  // Readmit slaves.
+  // Mark slaves reachable again. This simulates the master failing
+  // over, and then the previously registered agents reregistering
+  // with the new master.
   watch.start();
   foreach (const SlaveInfo& info, infos) {
-    result = registrar.apply(Owned<Operation>(new ReadmitSlave(info)));
+    result = registrar.apply(Owned<Operation>(new MarkSlaveReachable(info)));
   }
   AWAIT_READY_FOR(result, Minutes(5));
-  LOG(INFO) << "Readmitted " << slaveCount << " agents in " << watch.elapsed();
+  LOG(INFO) << "Marked " << slaveCount
+            << " agents reachable in " << watch.elapsed();
 
   // Recover slaves.
   Registrar registrar2(flags, state);

Reply via email to