Added registrar operations for marking agents (un-)reachable. Review: https://reviews.apache.org/r/50701/
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/af496f3a Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/af496f3a Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/af496f3a Branch: refs/heads/master Commit: af496f3a80da9a8e7961fb62f839aacf1658222e Parents: 5405914 Author: Neil Conway <neil.con...@gmail.com> Authored: Fri Aug 26 14:48:07 2016 -0700 Committer: Vinod Kone <vinodk...@gmail.com> Committed: Fri Aug 26 14:48:07 2016 -0700 ---------------------------------------------------------------------- src/master/master.hpp | 118 ++++++++++++++++++++++++++++++++++++- src/tests/registrar_tests.cpp | 95 ++++++++++++++++++++--------- 2 files changed, 183 insertions(+), 30 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/af496f3a/src/master/master.hpp ---------------------------------------------------------------------- diff --git a/src/master/master.hpp b/src/master/master.hpp index 6decff6..b118293 100644 --- a/src/master/master.hpp +++ b/src/master/master.hpp @@ -1891,7 +1891,7 @@ private: }; -// Implementation of slave admission Registrar operation. +// Add a new slave to the list of admitted slaves. class AdmitSlave : public Operation { public: @@ -1906,7 +1906,7 @@ protected: hashset<SlaveID>* slaveIDs, bool strict) { - // Check and see if this slave already exists. + // Check and see if this slave is currently admitted. if (slaveIDs->contains(info.id())) { if (strict) { return Error("Agent already admitted"); @@ -1915,6 +1915,9 @@ protected: } } + // TODO(neilc): Check if the slave appears in the list of + // `unreachable` slaves in the registry? + Registry::Slave* slave = registry->mutable_slaves()->add_slaves(); slave->mutable_info()->CopyFrom(info); slaveIDs->insert(info.id()); @@ -1926,6 +1929,117 @@ private: }; +// Move a slave from the list of admitted slaves to the list of +// unreachable slaves. +class MarkSlaveUnreachable : public Operation +{ +public: + explicit MarkSlaveUnreachable(const SlaveInfo& _info) : info(_info) { + CHECK(info.has_id()) << "SlaveInfo is missing the 'id' field"; + } + +protected: + virtual Try<bool> perform( + Registry* registry, + hashset<SlaveID>* slaveIDs, + bool strict) + { + // As currently implemented, this should not be possible: the + // master will only mark slaves unreachable that are currently + // admitted. + if (!slaveIDs->contains(info.id())) { + return Error("Agent not yet admitted"); + } + + for (int i = 0; i < registry->slaves().slaves().size(); i++) { + const Registry::Slave& slave = registry->slaves().slaves(i); + + if (slave.info().id() == info.id()) { + registry->mutable_slaves()->mutable_slaves()->DeleteSubrange(i, 1); + slaveIDs->erase(info.id()); + + Registry::UnreachableSlave* unreachable = + registry->mutable_unreachable()->add_slaves(); + + unreachable->mutable_id()->CopyFrom(info.id()); + unreachable->mutable_timestamp()->CopyFrom(protobuf::getCurrentTime()); + + return true; // Mutation. + } + } + + // Should not happen. + return Error("Failed to find agent " + stringify(info.id())); + } + +private: + const SlaveInfo info; +}; + + +// Add a slave back to the list of admitted slaves. The slave will +// typically be in the "unreachable" list; if so, it is removed from +// that list. The slave might also be in the "admitted" list already. +// Finally, the slave might be in neither the "unreachable" or +// "admitted" lists, if its metadata has been garbage collected from +// the registry. +class MarkSlaveReachable : public Operation +{ +public: + explicit MarkSlaveReachable(const SlaveInfo& _info) : info(_info) { + CHECK(info.has_id()) << "SlaveInfo is missing the 'id' field"; + } + +protected: + virtual Try<bool> perform( + Registry* registry, + hashset<SlaveID>* slaveIDs, + bool strict) + { + // A slave might try to reregister that appears in the list of + // admitted slaves. This can occur when the master fails over: + // agents will usually attempt to reregister with the new master + // before they are marked unreachable. In this situation, the + // registry is already in the correct state, so no changes are + // needed. + if (slaveIDs->contains(info.id())) { + return false; // No mutation. + } + + // Check whether the slave is in the unreachable list. + // TODO(neilc): Optimize this to avoid linear scan. + bool found = false; + for (int i = 0; i < registry->unreachable().slaves().size(); i++) { + const Registry::UnreachableSlave& slave = + registry->unreachable().slaves(i); + + if (slave.id() == info.id()) { + registry->mutable_unreachable()->mutable_slaves()->DeleteSubrange(i, 1); + found = true; + break; + } + } + + if (!found) { + LOG(WARNING) << "Allowing UNKNOWN agent to reregister: " << info; + } + + // Add the slave to the admitted list, even if we didn't find it + // in the unreachable list. This accounts for when the slave was + // unreachable for a long time, was GC'd from the unreachable + // list, but then eventually reregistered. + Registry::Slave* slave = registry->mutable_slaves()->add_slaves(); + slave->mutable_info()->CopyFrom(info); + slaveIDs->insert(info.id()); + + return true; // Mutation. + } + +private: + const SlaveInfo info; +}; + + // Implementation of slave readmission Registrar operation. class ReadmitSlave : public Operation { http://git-wip-us.apache.org/repos/asf/mesos/blob/af496f3a/src/tests/registrar_tests.cpp ---------------------------------------------------------------------- diff --git a/src/tests/registrar_tests.cpp b/src/tests/registrar_tests.cpp index 9a71d8f..b04fc92 100644 --- a/src/tests/registrar_tests.cpp +++ b/src/tests/registrar_tests.cpp @@ -236,7 +236,9 @@ TEST_P(RegistrarTest, Recover) AWAIT_EXPECT_FAILED( registrar.apply(Owned<Operation>(new AdmitSlave(slave)))); AWAIT_EXPECT_FAILED( - registrar.apply(Owned<Operation>(new ReadmitSlave(slave)))); + registrar.apply(Owned<Operation>(new MarkSlaveUnreachable(slave)))); + AWAIT_EXPECT_FAILED( + registrar.apply(Owned<Operation>(new MarkSlaveReachable(slave)))); AWAIT_EXPECT_FAILED( registrar.apply(Owned<Operation>(new RemoveSlave(slave)))); @@ -246,8 +248,10 @@ TEST_P(RegistrarTest, Recover) // operations to ensure they do not fail. Future<bool> admit = registrar.apply( Owned<Operation>(new AdmitSlave(slave))); - Future<bool> readmit = registrar.apply( - Owned<Operation>(new ReadmitSlave(slave))); + Future<bool> unreachable = registrar.apply( + Owned<Operation>(new MarkSlaveUnreachable(slave))); + Future<bool> reachable = registrar.apply( + Owned<Operation>(new MarkSlaveReachable(slave))); Future<bool> remove = registrar.apply( Owned<Operation>(new RemoveSlave(slave))); @@ -255,7 +259,8 @@ TEST_P(RegistrarTest, Recover) EXPECT_EQ(master, registry.get().master().info()); AWAIT_TRUE(admit); - AWAIT_TRUE(readmit); + AWAIT_TRUE(unreachable); + AWAIT_TRUE(reachable); AWAIT_TRUE(remove); } @@ -275,7 +280,7 @@ TEST_P(RegistrarTest, Admit) } -TEST_P(RegistrarTest, Readmit) +TEST_P(RegistrarTest, MarkReachable) { Registrar registrar(flags, state); AWAIT_READY(registrar.recover(master)); @@ -296,13 +301,51 @@ TEST_P(RegistrarTest, Readmit) AWAIT_TRUE(registrar.apply(Owned<Operation>(new AdmitSlave(info1)))); - AWAIT_TRUE(registrar.apply(Owned<Operation>(new ReadmitSlave(info1)))); + // Attempting to mark a slave as reachable that is already reachable + // should not result in an error. + AWAIT_TRUE(registrar.apply(Owned<Operation>(new MarkSlaveReachable(info1)))); + AWAIT_TRUE(registrar.apply(Owned<Operation>(new MarkSlaveReachable(info1)))); - if (flags.registry_strict) { - AWAIT_FALSE(registrar.apply(Owned<Operation>(new ReadmitSlave(info2)))); - } else { - AWAIT_TRUE(registrar.apply(Owned<Operation>(new ReadmitSlave(info2)))); - } + // Attempting to mark a slave as reachable that is not in the + // unreachable list should not result in error. + AWAIT_TRUE(registrar.apply(Owned<Operation>(new MarkSlaveReachable(info2)))); +} + + +TEST_P(RegistrarTest, MarkUnreachable) +{ + Registrar registrar(flags, state); + AWAIT_READY(registrar.recover(master)); + + SlaveInfo info1; + info1.set_hostname("localhost"); + + SlaveID id1; + id1.set_value("1"); + info1.mutable_id()->CopyFrom(id1); + + SlaveID id2; + id2.set_value("2"); + + SlaveInfo info2; + info2.set_hostname("localhost"); + info2.mutable_id()->CopyFrom(id2); + + AWAIT_TRUE(registrar.apply(Owned<Operation>(new AdmitSlave(info1)))); + + AWAIT_TRUE( + registrar.apply(Owned<Operation>(new MarkSlaveUnreachable(info1)))); + + AWAIT_TRUE( + registrar.apply(Owned<Operation>(new MarkSlaveReachable(info1)))); + + AWAIT_TRUE( + registrar.apply(Owned<Operation>(new MarkSlaveUnreachable(info1)))); + + // If a slave is already unreachable, trying to mark it unreachable + // again should fail. + AWAIT_FALSE( + registrar.apply(Owned<Operation>(new MarkSlaveUnreachable(info1)))); } @@ -977,20 +1020,17 @@ TEST_P(RegistrarTest, UpdateWeights) TEST_P(RegistrarTest, Bootstrap) { - // Run 1 readmits a slave that is not present. + // Run 1 simulates the reregistration of a slave that is not present + // in the registry. { Registrar registrar(flags, state); AWAIT_READY(registrar.recover(master)); - // If not strict, we should be allowed to readmit the slave. - if (flags.registry_strict) { - AWAIT_FALSE(registrar.apply(Owned<Operation>(new ReadmitSlave(slave)))); - } else { - AWAIT_TRUE(registrar.apply(Owned<Operation>(new ReadmitSlave(slave)))); - } + AWAIT_TRUE( + registrar.apply(Owned<Operation>(new MarkSlaveReachable(slave)))); } - // Run 2 should see the slave if not strict. + // Run 2 should see the slave. { Registrar registrar(flags, state); @@ -998,12 +1038,8 @@ TEST_P(RegistrarTest, Bootstrap) AWAIT_READY(registry); - if (flags.registry_strict) { - EXPECT_EQ(0, registry.get().slaves().slaves().size()); - } else { - ASSERT_EQ(1, registry.get().slaves().slaves().size()); - EXPECT_EQ(slave, registry.get().slaves().slaves(0).info()); - } + ASSERT_EQ(1, registry.get().slaves().slaves().size()); + EXPECT_EQ(slave, registry.get().slaves().slaves(0).info()); } } @@ -1206,13 +1242,16 @@ TEST_P(Registrar_BENCHMARK_Test, Performance) // same as in production). std::random_shuffle(infos.begin(), infos.end()); - // Readmit slaves. + // Mark slaves reachable again. This simulates the master failing + // over, and then the previously registered agents reregistering + // with the new master. watch.start(); foreach (const SlaveInfo& info, infos) { - result = registrar.apply(Owned<Operation>(new ReadmitSlave(info))); + result = registrar.apply(Owned<Operation>(new MarkSlaveReachable(info))); } AWAIT_READY_FOR(result, Minutes(5)); - LOG(INFO) << "Readmitted " << slaveCount << " agents in " << watch.elapsed(); + LOG(INFO) << "Marked " << slaveCount + << " agents reachable in " << watch.elapsed(); // Recover slaves. Registrar registrar2(flags, state);