Changed reconciliation for unregistering, reregistering agents. Previously, explicit reconciliation for an agent that was in the process of reregistering or unregistering returned no results. This degree of cleverness seems unwarranted: if the agent hasn't completed the reregistration or unregistration process, it seems simpler for the master to return the previous state of the agent. This is what the framework would observe if their reconcile request lost the race with the reregister/unregister operation, anyway.
Note that since reregistering agents are no longer considered to be "in transition", we need to slightly adjust the rules for how we update the `slaves.recovered` collection in the master: an agent remains in the "recovered" collection until it has been marked reachable in the registry (rather than removing it from "recovered" as soon as the reregistration process beings). This is more consistent with how we manage the other collections in the master anyway: an agent appears in the `recovered` list until the registry operation that reregisters it has been successfully applied. Review: https://reviews.apache.org/r/52083/ Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/e510813f Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/e510813f Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/e510813f Branch: refs/heads/master Commit: e510813f93e253480005ce95cc4bd7ef094193db Parents: 2c15661 Author: Neil Conway <neil.con...@gmail.com> Authored: Wed Oct 19 16:31:37 2016 -0700 Committer: Vinod Kone <vinodk...@gmail.com> Committed: Wed Oct 19 16:31:37 2016 -0700 ---------------------------------------------------------------------- src/master/master.cpp | 37 ++++-- src/master/master.hpp | 15 +-- src/tests/master_tests.cpp | 66 ++++++++++ src/tests/reconciliation_tests.cpp | 208 +++++++++++++++++++++++++++++--- 4 files changed, 290 insertions(+), 36 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/e510813f/src/master/master.cpp ---------------------------------------------------------------------- diff --git a/src/master/master.cpp b/src/master/master.cpp index 3c6b18e..324391a 100644 --- a/src/master/master.cpp +++ b/src/master/master.cpp @@ -1829,8 +1829,12 @@ void Master::recoveredSlavesTimeout(const Registry& registry) // Remove the slaves in a rate limited manner, similar to how the // SlaveObserver removes slaves. foreach (const Registry::Slave& slave, registry.slaves().slaves()) { - // The slave is removed from 'recovered' when it re-registers. - if (!slaves.recovered.contains(slave.info().id())) { + // The slave is removed from `recovered` when it completes the + // re-registration process. If the slave is in `reregistering`, it + // has started but not yet finished re-registering. In either + // case, we don't want to try to remove it. + if (!slaves.recovered.contains(slave.info().id()) || + slaves.reregistering.contains(slave.info().id())) { continue; } @@ -1859,7 +1863,8 @@ void Master::recoveredSlavesTimeout(const Registry& registry) Nothing Master::markUnreachableAfterFailover(const Registry::Slave& slave) { - // The slave is removed from 'recovered' when it re-registers. + // The slave might have reregistered while we were waiting to + // acquire the rate limit. if (!slaves.recovered.contains(slave.info().id())) { LOG(INFO) << "Canceling transition of agent " << slave.info().id() << " (" << slave.info().hostname() << ")" @@ -1869,6 +1874,16 @@ Nothing Master::markUnreachableAfterFailover(const Registry::Slave& slave) return Nothing(); } + // The slave might be in the process of reregistering. + if (slaves.reregistering.contains(slave.info().id())) { + LOG(INFO) << "Canceling transition of agent " + << slave.info().id() << " (" << slave.info().hostname() << ")" + << " to unreachable because it is re-registering"; + + ++metrics->slave_unreachable_canceled; + return Nothing(); + } + LOG(WARNING) << "Agent " << slave.info().id() << " (" << slave.info().hostname() << ") did not re-register" << " within " << flags.agent_reregister_timeout @@ -1876,8 +1891,6 @@ Nothing Master::markUnreachableAfterFailover(const Registry::Slave& slave) ++metrics->slave_unreachable_completed; - slaves.recovered.erase(slave.info().id()); - TimeInfo unreachableTime = protobuf::getCurrentTime(); slaves.markingUnreachable.insert(slave.info().id()); @@ -1900,6 +1913,9 @@ void Master::_markUnreachableAfterFailover( CHECK(slaves.markingUnreachable.contains(slaveInfo.id())); slaves.markingUnreachable.erase(slaveInfo.id()); + CHECK(slaves.recovered.contains(slaveInfo.id())); + slaves.recovered.erase(slaveInfo.id()); + if (registrarResult.isFailed()) { LOG(FATAL) << "Failed to mark agent " << slaveInfo.id() << " (" << slaveInfo.hostname() << ")" @@ -5233,6 +5249,8 @@ void Master::reregisterSlave( Slave* slave = slaves.registered.get(slaveInfo.id()); if (slave != nullptr) { + CHECK(!slaves.recovered.contains(slaveInfo.id())); + slave->reregisteredTime = Clock::now(); // NOTE: This handles the case where a slave tries to @@ -5302,10 +5320,6 @@ void Master::reregisterSlave( return; } - // Ensure we don't remove the slave for not re-registering after - // we've recovered it from the registry. - slaves.recovered.erase(slaveInfo.id()); - // If we're already re-registering this slave, then no need to ask // the registrar again. if (slaves.reregistering.contains(slaveInfo.id())) { @@ -5367,6 +5381,11 @@ void Master::_reregisterSlave( CHECK(readmit.get()); // Re-admission succeeded. + + // Ensure we don't remove the slave for not re-registering after + // we've recovered it from the registry. + slaves.recovered.erase(slaveInfo.id()); + MachineID machineId; machineId.set_hostname(slaveInfo.hostname()); machineId.set_ip(stringify(pid.address.ip)); http://git-wip-us.apache.org/repos/asf/mesos/blob/e510813f/src/master/master.hpp ---------------------------------------------------------------------- diff --git a/src/master/master.hpp b/src/master/master.hpp index 881f0d6..263ceb4 100644 --- a/src/master/master.hpp +++ b/src/master/master.hpp @@ -1616,9 +1616,10 @@ private: // registry to re-register with the master. Option<process::Timer> recoveredTimer; - // Slaves that have been recovered from the registrar but have yet - // to re-register. We use `recoveredTimer` above to ensure we - // remove these slaves if they do not re-register. + // Slaves that have been recovered from the registrar after master + // failover. Slaves are removed from this collection when they + // either re-register with the master or are marked unreachable + // because they do not re-register before `recoveredTimer` fires. hashset<SlaveID> recovered; // Slaves that are in the process of registering. @@ -1726,13 +1727,9 @@ private: bool transitioning(const Option<SlaveID>& slaveId) { if (slaveId.isSome()) { - return recovered.contains(slaveId.get()) || - reregistering.contains(slaveId.get()) || - removing.contains(slaveId.get()); + return recovered.contains(slaveId.get()); } else { - return !recovered.empty() || - !reregistering.empty() || - !removing.empty(); + return !recovered.empty(); } } } slaves; http://git-wip-us.apache.org/repos/asf/mesos/blob/e510813f/src/tests/master_tests.cpp ---------------------------------------------------------------------- diff --git a/src/tests/master_tests.cpp b/src/tests/master_tests.cpp index 88cf1e6..df492d3 100644 --- a/src/tests/master_tests.cpp +++ b/src/tests/master_tests.cpp @@ -2222,6 +2222,72 @@ TEST_F(MasterTest, RecoveredSlaveReregisters) } +// This test checks that the master behaves correctly when a slave is +// in the process of reregistering after master failover when the +// agent failover timeout expires. +TEST_F(MasterTest, RecoveredSlaveReregisterThenUnreachableRace) +{ + master::Flags masterFlags = CreateMasterFlags(); + Try<Owned<cluster::Master>> master = StartMaster(masterFlags); + ASSERT_SOME(master); + + Future<SlaveRegisteredMessage> slaveRegisteredMessage = + FUTURE_PROTOBUF(SlaveRegisteredMessage(), master.get()->pid, _); + + // Reuse slaveFlags so both StartSlave() use the same work_dir. + slave::Flags slaveFlags = this->CreateSlaveFlags(); + + Owned<MasterDetector> detector = master.get()->createDetector(); + Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), slaveFlags); + ASSERT_SOME(slave); + + AWAIT_READY(slaveRegisteredMessage); + + // Stop the slave while the master is down. + master->reset(); + slave.get()->terminate(); + slave->reset(); + + // Restart the master. + master = StartMaster(masterFlags); + ASSERT_SOME(master); + + // Start the slave, which will cause it to reregister. Intercept the + // next registry operation, which we expect to be slave reregistration. + Future<ReregisterSlaveMessage> reregisterSlaveMessage = + FUTURE_PROTOBUF(ReregisterSlaveMessage(), _, master.get()->pid); + + Future<Owned<master::Operation>> reregister; + Promise<bool> reregisterContinue; + EXPECT_CALL(*master.get()->registrar.get(), apply(_)) + .WillOnce(DoAll(FutureArg<0>(&reregister), + Return(reregisterContinue.future()))); + + detector = master.get()->createDetector(); + slave = StartSlave(detector.get(), slaveFlags); + ASSERT_SOME(slave); + + AWAIT_READY(reregisterSlaveMessage); + + AWAIT_READY(reregister); + EXPECT_NE( + nullptr, + dynamic_cast<master::MarkSlaveReachable*>( + reregister.get().get())); + + // Advance the clock to cause the agent reregister timeout to + // expire. Because slave reregistration has already started, we do + // NOT expect the master to mark the slave unreachable. Hence we + // don't expect to see any registry operations. + EXPECT_CALL(*master.get()->registrar.get(), apply(_)) + .Times(0); + + Clock::pause(); + Clock::advance(masterFlags.agent_reregister_timeout); + Clock::settle(); +} + + #ifdef MESOS_HAS_JAVA class MasterZooKeeperTest : public MesosZooKeeperTest {}; http://git-wip-us.apache.org/repos/asf/mesos/blob/e510813f/src/tests/reconciliation_tests.cpp ---------------------------------------------------------------------- diff --git a/src/tests/reconciliation_tests.cpp b/src/tests/reconciliation_tests.cpp index 1412090..d2bba48 100644 --- a/src/tests/reconciliation_tests.cpp +++ b/src/tests/reconciliation_tests.cpp @@ -376,9 +376,10 @@ TEST_F(ReconciliationTest, UnknownKillTask) } -// This test verifies that reconciliation of a task that belongs to a -// slave that is in a transitional state doesn't result in an update. -TEST_F(ReconciliationTest, SlaveInTransition) +// This test verifies that explicit reconciliation does not return any +// results for tasks running on an agent that has been recovered from +// the registry after master failover but has not yet reregistered. +TEST_F(ReconciliationTest, RecoveredAgent) { master::Flags masterFlags = CreateMasterFlags(); Try<Owned<cluster::Master>> master = StartMaster(); @@ -398,15 +399,105 @@ TEST_F(ReconciliationTest, SlaveInTransition) AWAIT_READY(slaveRegisteredMessage); const SlaveID slaveId = slaveRegisteredMessage.get().slave_id(); + // Stop the master. + master->reset(); + + // Stop the slave. + slave.get()->terminate(); + slave->reset(); + + // Restart the master. + master = StartMaster(masterFlags); + ASSERT_SOME(master); + MockScheduler sched; MesosSchedulerDriver driver( &sched, DEFAULT_FRAMEWORK_INFO, master.get()->pid, DEFAULT_CREDENTIAL); - // Stop the slave and master (a bit later). + Future<FrameworkID> frameworkId; + EXPECT_CALL(sched, registered(&driver, _, _)) + .WillOnce(FutureArg<1>(&frameworkId)); + + EXPECT_CALL(sched, resourceOffers(&driver, _)) + .WillRepeatedly(Return()); // Ignore offers. + + // Framework should not receive any task status updates. + EXPECT_CALL(sched, statusUpdate(&driver, _)) + .Times(0); + + driver.start(); + + // Wait for the framework to register. + AWAIT_READY(frameworkId); + + // Do reconciliation before the agent has attempted to reregister. + // This should not yield any results. + Future<mesos::scheduler::Call> reconcileCall = FUTURE_CALL( + mesos::scheduler::Call(), mesos::scheduler::Call::RECONCILE, _ , _); + + // Reconcile for a random task ID on the slave. + TaskStatus status; + status.mutable_task_id()->set_value(UUID::random().toString()); + status.mutable_slave_id()->CopyFrom(slaveId); + status.set_state(TASK_STAGING); // Dummy value. + + driver.reconcileTasks({status}); + + // Make sure the master received the reconcile call. + AWAIT_READY(reconcileCall); + + // The Clock::settle() will ensure that framework would receive + // a status update if it is sent by the master. In this test it + // shouldn't receive any. + Clock::pause(); + Clock::settle(); + + Clock::resume(); + + driver.stop(); + driver.join(); +} + + +// This test verifies that explicit reconciliation does not return any +// results for tasks running on an agent that has been recovered from +// the registry after master failover, where the agent has started the +// reregistration process but has not completed it yet. +TEST_F(ReconciliationTest, RecoveredAgentReregistrationInProgress) +{ + master::Flags masterFlags = CreateMasterFlags(); + Try<Owned<cluster::Master>> master = StartMaster(); + ASSERT_SOME(master); + + // Reuse slaveFlags so both StartSlave() use the same work_dir. + slave::Flags slaveFlags = CreateSlaveFlags(); + + Future<SlaveRegisteredMessage> slaveRegisteredMessage = + FUTURE_PROTOBUF(SlaveRegisteredMessage(), _, _); + + Owned<MasterDetector> detector = master.get()->createDetector(); + Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), slaveFlags); + ASSERT_SOME(slave); + + // Wait for the slave to register and get the slave id. + AWAIT_READY(slaveRegisteredMessage); + const SlaveID slaveId = slaveRegisteredMessage.get().slave_id(); + + // Stop the master. master->reset(); + + // Stop the slave. slave.get()->terminate(); slave->reset(); + // Restart the master. + master = StartMaster(masterFlags); + ASSERT_SOME(master); + + MockScheduler sched; + MesosSchedulerDriver driver( + &sched, DEFAULT_FRAMEWORK_INFO, master.get()->pid, DEFAULT_CREDENTIAL); + Future<FrameworkID> frameworkId; EXPECT_CALL(sched, registered(&driver, _, _)) .WillOnce(FutureArg<1>(&frameworkId)); @@ -414,13 +505,14 @@ TEST_F(ReconciliationTest, SlaveInTransition) EXPECT_CALL(sched, resourceOffers(&driver, _)) .WillRepeatedly(Return()); // Ignore offers. - // Framework should not receive any update. + // Framework should not receive any task status updates. EXPECT_CALL(sched, statusUpdate(&driver, _)) .Times(0); - // Restart the master. - master = StartMaster(masterFlags); - ASSERT_SOME(master); + driver.start(); + + // Wait for the framework to register. + AWAIT_READY(frameworkId); // Intercept the first registrar operation that is attempted; this // should be the registry operation that reregisters the slave. @@ -430,11 +522,6 @@ TEST_F(ReconciliationTest, SlaveInTransition) .WillOnce(DoAll(FutureArg<0>(&reregister), Return(promise.future()))); - driver.start(); - - // Wait for the framework to register. - AWAIT_READY(frameworkId); - // Restart the slave. detector = master.get()->createDetector(); slave = StartSlave(detector.get(), slaveFlags); @@ -449,9 +536,7 @@ TEST_F(ReconciliationTest, SlaveInTransition) Future<mesos::scheduler::Call> reconcileCall = FUTURE_CALL( mesos::scheduler::Call(), mesos::scheduler::Call::RECONCILE, _ , _); - Clock::pause(); - - // Create a task status with a random task id. + // Reconcile for a random task ID on the slave. TaskStatus status; status.mutable_task_id()->set_value(UUID::random().toString()); status.mutable_slave_id()->CopyFrom(slaveId); @@ -462,11 +547,98 @@ TEST_F(ReconciliationTest, SlaveInTransition) // Make sure the master received the reconcile call. AWAIT_READY(reconcileCall); - // The Clock::settle() will ensure that framework would receive - // a status update if it is sent by the master. In this test it + // The Clock::settle() will ensure that the framework receives a + // status update if it is sent by the master. In this test it // shouldn't receive any. + Clock::pause(); Clock::settle(); + Clock::resume(); + + driver.stop(); + driver.join(); +} + + +// This test ensures that when an agent has started but not finished +// the unregistration process, explicit reconciliation indicates that +// the agent is still registered. +TEST_F(ReconciliationTest, RemovalInProgress) +{ + master::Flags masterFlags = CreateMasterFlags(); + Try<Owned<cluster::Master>> master = StartMaster(); + ASSERT_SOME(master); + + Future<SlaveRegisteredMessage> slaveRegisteredMessage = + FUTURE_PROTOBUF(SlaveRegisteredMessage(), _, _); + + Owned<MasterDetector> detector = master.get()->createDetector(); + Try<Owned<cluster::Slave>> slave = StartSlave(detector.get()); + ASSERT_SOME(slave); + + // Wait for the slave to register and get the slave id. + AWAIT_READY(slaveRegisteredMessage); + const SlaveID slaveId = slaveRegisteredMessage.get().slave_id(); + + Future<UnregisterSlaveMessage> unregisterSlaveMessage = + FUTURE_PROTOBUF( + UnregisterSlaveMessage(), + slave.get()->pid, + master.get()->pid); + + // Intercept the next registrar operation; this should be the + // registry operation that unregisters the slave. + Future<Owned<master::Operation>> unregister; + Future<Nothing> unregisterStarted; + Promise<bool> promise; // Never satisfied. + EXPECT_CALL(*master.get()->registrar.get(), apply(_)) + .WillOnce(DoAll(FutureArg<0>(&unregister), + Return(promise.future()))); + + // Cause the slave to shutdown gracefully. + slave.get()->shutdown(); + slave->reset(); + + AWAIT_READY(unregisterSlaveMessage); + + AWAIT_READY(unregister); + EXPECT_NE( + nullptr, + dynamic_cast<master::RemoveSlave*>(unregister.get().get())); + + MockScheduler sched; + MesosSchedulerDriver driver( + &sched, DEFAULT_FRAMEWORK_INFO, master.get()->pid, DEFAULT_CREDENTIAL); + + Future<FrameworkID> frameworkId; + EXPECT_CALL(sched, registered(&driver, _, _)) + .WillOnce(FutureArg<1>(&frameworkId)); + + EXPECT_CALL(sched, resourceOffers(&driver, _)) + .WillRepeatedly(Return()); // Ignore offers. + + driver.start(); + + // Wait for the framework to register. + AWAIT_READY(frameworkId); + + Future<TaskStatus> update; + EXPECT_CALL(sched, statusUpdate(&driver, _)) + .WillOnce(FutureArg<1>(&update)); + + // Reconcile for a random task ID on the slave. + TaskStatus status; + status.mutable_task_id()->set_value(UUID::random().toString()); + status.mutable_slave_id()->CopyFrom(slaveId); + status.set_state(TASK_STAGING); // Dummy value. + + driver.reconcileTasks({status}); + + AWAIT_READY(update); + EXPECT_EQ(TASK_LOST, update.get().state()); + EXPECT_EQ(TaskStatus::REASON_RECONCILIATION, update.get().reason()); + EXPECT_FALSE(update.get().has_unreachable_time()); + driver.stop(); driver.join(); }