Fixed bug when marking agents unreachable after master failover. If the master fails over and an agent does not re-register within the `agent_reregister_timeout`, the master marks the agent as unreachable in the registry and sends `slaveLost` for it. However, we neglected to update the master's in-memory state for the newly unreachable agent; this meant that task reconciliation would return incorrect results (until/unless the next master failover).
Review: https://reviews.apache.org/r/53097/ Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/61a0a10d Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/61a0a10d Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/61a0a10d Branch: refs/heads/master Commit: 61a0a10d4a83695a76115385b2401e6059062a8a Parents: 27eb6b7 Author: Neil Conway <neil.con...@gmail.com> Authored: Fri Oct 21 14:18:52 2016 -0700 Committer: Vinod Kone <vinodk...@gmail.com> Committed: Fri Oct 21 14:18:52 2016 -0700 ---------------------------------------------------------------------- src/master/master.cpp | 4 ++ src/master/master.hpp | 1 + src/tests/master_tests.cpp | 148 +++++++++++++++++++++++++++++++++++++++- 3 files changed, 152 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/61a0a10d/src/master/master.cpp ---------------------------------------------------------------------- diff --git a/src/master/master.cpp b/src/master/master.cpp index 500cca4..23ddb99 100644 --- a/src/master/master.cpp +++ b/src/master/master.cpp @@ -1900,6 +1900,7 @@ Nothing Master::markUnreachableAfterFailover(const Registry::Slave& slave) .onAny(defer(self(), &Self::_markUnreachableAfterFailover, slave.info(), + unreachableTime, lambda::_1)); return Nothing(); @@ -1908,6 +1909,7 @@ Nothing Master::markUnreachableAfterFailover(const Registry::Slave& slave) void Master::_markUnreachableAfterFailover( const SlaveInfo& slaveInfo, + const TimeInfo& unreachableTime, const Future<bool>& registrarResult) { CHECK(slaves.markingUnreachable.contains(slaveInfo.id())); @@ -1936,6 +1938,8 @@ void Master::_markUnreachableAfterFailover( ++metrics->slave_removals_reason_unhealthy; ++metrics->recovery_slave_removals; + slaves.unreachable[slaveInfo.id()] = unreachableTime; + sendSlaveLost(slaveInfo); } http://git-wip-us.apache.org/repos/asf/mesos/blob/61a0a10d/src/master/master.hpp ---------------------------------------------------------------------- diff --git a/src/master/master.hpp b/src/master/master.hpp index 41f8480..87186c6 100644 --- a/src/master/master.hpp +++ b/src/master/master.hpp @@ -692,6 +692,7 @@ protected: void _markUnreachableAfterFailover( const SlaveInfo& slaveInfo, + const TimeInfo& unreachableTime, const process::Future<bool>& registrarResult); void sendSlaveLost(const SlaveInfo& slaveInfo); http://git-wip-us.apache.org/repos/asf/mesos/blob/61a0a10d/src/tests/master_tests.cpp ---------------------------------------------------------------------- diff --git a/src/tests/master_tests.cpp b/src/tests/master_tests.cpp index 033fae3..2634b25 100644 --- a/src/tests/master_tests.cpp +++ b/src/tests/master_tests.cpp @@ -2132,7 +2132,7 @@ TEST_F(MasterTest, RecoveredSlaveCanReregister) AWAIT_READY(registered); // Step 6: Advance the clock until the re-registration timeout - // elapses, and expect the slave / task to be lost! + // elapses, and expect the slave to be lost! Future<Nothing> slaveLost; EXPECT_CALL(sched, slaveLost(&driver, _)) .WillOnce(FutureSatisfy(&slaveLost)); @@ -2173,6 +2173,152 @@ TEST_F(MasterTest, RecoveredSlaveCanReregister) } +// This test ensures that when a master fails over and an agent does +// not reregister within the `agent_reregister_timeout`, the agent is +// marked unreachable; the framework should NOT receive a status +// update for any tasks running on the agent, but reconciliation +// should indicate the agent is unreachable. +TEST_F(MasterTest, UnreachableTaskAfterFailover) +{ + // Step 1: Start a master. + master::Flags masterFlags = CreateMasterFlags(); + Try<Owned<cluster::Master>> master = StartMaster(masterFlags); + ASSERT_SOME(master); + + // Step 2: Start a slave. + Future<SlaveRegisteredMessage> slaveRegisteredMessage = + FUTURE_PROTOBUF(SlaveRegisteredMessage(), _, _); + + StandaloneMasterDetector slaveDetector(master.get()->pid); + Try<Owned<cluster::Slave>> slave = StartSlave(&slaveDetector); + ASSERT_SOME(slave); + + AWAIT_READY(slaveRegisteredMessage); + const SlaveID slaveId = slaveRegisteredMessage.get().slave_id(); + + // Step 3: Start a scheduler. + StandaloneMasterDetector schedDetector(master.get()->pid); + MockScheduler sched; + TestingMesosSchedulerDriver driver( + &sched, &schedDetector, DEFAULT_FRAMEWORK_INFO); + + EXPECT_CALL(sched, registered(&driver, _, _)) + .Times(1); + + Future<vector<Offer>> offers; + EXPECT_CALL(sched, resourceOffers(&driver, _)) + .WillOnce(FutureArg<1>(&offers)) + .WillRepeatedly(Return()); // Ignore subsequent offers. + + driver.start(); + + AWAIT_READY(offers); + EXPECT_NE(0u, offers.get().size()); + + TaskInfo task = createTask(offers.get()[0], "sleep 100"); + + Future<TaskStatus> runningStatus; + EXPECT_CALL(sched, statusUpdate(&driver, _)) + .WillOnce(FutureArg<1>(&runningStatus)); + + driver.launchTasks(offers.get()[0].id(), {task}); + + AWAIT_READY(runningStatus); + EXPECT_EQ(TASK_RUNNING, runningStatus.get().state()); + EXPECT_EQ(task.task_id(), runningStatus.get().task_id()); + + // Step 4: Simulate master failover. We leave the slave without a + // master so it does not attempt to re-register. + slaveDetector.appoint(None()); + + master->reset(); + master = StartMaster(masterFlags); + ASSERT_SOME(master); + + // Cause the scheduler to re-register with the master. + Future<Nothing> disconnected; + EXPECT_CALL(sched, disconnected(&driver)) + .WillOnce(FutureSatisfy(&disconnected)); + + Future<Nothing> registered; + EXPECT_CALL(sched, registered(&driver, _, _)) + .WillOnce(FutureSatisfy(®istered)); + + schedDetector.appoint(master.get()->pid); + + AWAIT_READY(disconnected); + AWAIT_READY(registered); + + Future<Nothing> slaveLost; + EXPECT_CALL(sched, slaveLost(&driver, _)) + .WillOnce(FutureSatisfy(&slaveLost)); + + // Trigger the slave re-registration timeout. + Clock::pause(); + Clock::advance(masterFlags.agent_reregister_timeout); + TimeInfo unreachableTime = protobuf::getCurrentTime(); + + // We expect to get a `slaveLost` signal; we do NOT expect to get a + // status update for the task that was running on the slave. + AWAIT_READY(slaveLost); + + // Reconciliation should return TASK_LOST, with `unreachable_time` + // equal to the time when the re-registration timeout fired. + TaskStatus status; + status.mutable_task_id()->CopyFrom(task.task_id()); + status.mutable_slave_id()->CopyFrom(slaveId); + status.set_state(TASK_STAGING); // Dummy value. + + Future<TaskStatus> reconcileUpdate1; + EXPECT_CALL(sched, statusUpdate(&driver, _)) + .WillOnce(FutureArg<1>(&reconcileUpdate1)); + + driver.reconcileTasks({status}); + + AWAIT_READY(reconcileUpdate1); + EXPECT_EQ(TASK_LOST, reconcileUpdate1.get().state()); + EXPECT_EQ(TaskStatus::REASON_RECONCILIATION, reconcileUpdate1.get().reason()); + EXPECT_EQ(unreachableTime, reconcileUpdate1.get().unreachable_time()); + + // Cause the slave to re-register with the master. + Future<SlaveReregisteredMessage> slaveReregisteredMessage = + FUTURE_PROTOBUF(SlaveReregisteredMessage(), _, _); + + slaveDetector.appoint(master.get()->pid); + + AWAIT_READY(slaveReregisteredMessage); + + // The task should have returned to TASK_RUNNING. This is true even + // for non-partition-aware frameworks, since we emulate the old + // "non-strict registry" semantics. + Future<TaskStatus> reconcileUpdate2; + EXPECT_CALL(sched, statusUpdate(&driver, _)) + .WillOnce(FutureArg<1>(&reconcileUpdate2)); + + driver.reconcileTasks({status}); + + AWAIT_READY(reconcileUpdate2); + EXPECT_EQ(TASK_RUNNING, reconcileUpdate2.get().state()); + EXPECT_EQ(TaskStatus::REASON_RECONCILIATION, reconcileUpdate2.get().reason()); + + Clock::resume(); + + JSON::Object stats = Metrics(); + EXPECT_EQ(0, stats.values["master/tasks_lost"]); + EXPECT_EQ(0, stats.values["master/tasks_unreachable"]); + EXPECT_EQ(1, stats.values["master/tasks_running"]); + EXPECT_EQ(1, stats.values["master/slave_unreachable_scheduled"]); + EXPECT_EQ(1, stats.values["master/slave_unreachable_completed"]); + EXPECT_EQ(1, stats.values["master/slave_removals"]); + EXPECT_EQ(1, stats.values["master/slave_removals/reason_unhealthy"]); + EXPECT_EQ(0, stats.values["master/slave_removals/reason_unregistered"]); + EXPECT_EQ(1, stats.values["master/recovery_slave_removals"]); + + driver.stop(); + driver.join(); +} + + // This test ensures that slave removals during master recovery // are rate limited. TEST_F(MasterTest, RateLimitRecoveredSlaveRemoval)