Repository: mesos Updated Branches: refs/heads/master 1afc364c0 -> f6c2fdb65
Added test for agent ping timeout during agent recovery. This patch adds a new test, `SlaveRecoveryTest.PingTimeoutDuringRecovery`, which verifies that the agent will reply to pings from the master while it is performing recovery. Review: https://reviews.apache.org/r/59463/ Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/2998026b Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/2998026b Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/2998026b Branch: refs/heads/master Commit: 2998026bb920924906fc1d4ddabc81397f8cd452 Parents: 1afc364 Author: Greg Mann <g...@mesosphere.io> Authored: Thu Jun 1 12:16:56 2017 -0700 Committer: Vinod Kone <vinodk...@gmail.com> Committed: Thu Jun 1 12:16:56 2017 -0700 ---------------------------------------------------------------------- src/tests/slave_recovery_tests.cpp | 151 ++++++++++++++++++++++++++++++++ 1 file changed, 151 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/2998026b/src/tests/slave_recovery_tests.cpp ---------------------------------------------------------------------- diff --git a/src/tests/slave_recovery_tests.cpp b/src/tests/slave_recovery_tests.cpp index e140f4d..86cf971 100644 --- a/src/tests/slave_recovery_tests.cpp +++ b/src/tests/slave_recovery_tests.cpp @@ -913,6 +913,157 @@ TYPED_TEST(SlaveRecoveryTest, ReconnectExecutorRetry) } +// This test verifies that the agent responds to pings from the master while the +// agent is performing recovery. We do this by setting the executor +// re-registration timeout to a duration longer than (agent_ping_timeout * +// (max_agent_ping_timeouts + 1)), and then confirming that the agent is not +// marked unreachable after the max ping timeout has elapsed, even if all +// executors have re-registered. Agent recovery currently does not complete +// until the executor re-registration timeout has elapsed (see MESOS-7539). +TYPED_TEST(SlaveRecoveryTest, PingTimeoutDuringRecovery) +{ + master::Flags masterFlags = this->CreateMasterFlags(); + masterFlags.agent_ping_timeout = Seconds(1); + masterFlags.max_agent_ping_timeouts = 2; + + Try<Owned<cluster::Master>> master = this->StartMaster(masterFlags); + ASSERT_SOME(master); + + // Set the executor re-register timeout to a value greater than + // (agent_ping_timeout * (max_agent_ping_timeouts + 1)). + slave::Flags slaveFlags = this->CreateSlaveFlags(); + slaveFlags.executor_reregistration_timeout = Seconds(15); + + Fetcher fetcher(slaveFlags); + + Try<TypeParam*> _containerizer = + TypeParam::create(slaveFlags, true, &fetcher); + ASSERT_SOME(_containerizer); + Owned<slave::Containerizer> containerizer(_containerizer.get()); + + Owned<MasterDetector> detector = master.get()->createDetector(); + + const string slavePid = "slave"; + + Try<Owned<cluster::Slave>> slave = + this->StartSlave(detector.get(), containerizer.get(), slavePid, slaveFlags); + ASSERT_SOME(slave); + + // Enable checkpointing for the framework. + FrameworkInfo frameworkInfo = DEFAULT_FRAMEWORK_INFO; + frameworkInfo.set_checkpoint(true); + + MockScheduler sched; + MesosSchedulerDriver driver( + &sched, frameworkInfo, master.get()->pid, false, DEFAULT_CREDENTIAL); + + EXPECT_CALL(sched, registered(_, _, _)); + + Future<vector<Offer>> offers; + EXPECT_CALL(sched, resourceOffers(_, _)) + .WillOnce(FutureArg<1>(&offers)) + .WillRepeatedly(Return()); // Ignore subsequent offers. + + driver.start(); + + AWAIT_READY(offers); + EXPECT_NE(0u, offers->size()); + + TaskInfo task = createTask(offers.get()[0], "sleep 1000"); + + Future<TaskStatus> statusUpdate1; + EXPECT_CALL(sched, statusUpdate(&driver, _)) + .WillOnce(FutureArg<1>(&statusUpdate1)); + + driver.launchTasks(offers.get()[0].id(), {task}); + + AWAIT_READY(statusUpdate1); + ASSERT_EQ(TASK_RUNNING, statusUpdate1->state()); + + Future<Nothing> _statusUpdateAcknowledgement = + FUTURE_DISPATCH(_, &Slave::_statusUpdateAcknowledgement); + + driver.acknowledgeStatusUpdate(statusUpdate1.get()); + + AWAIT_READY(_statusUpdateAcknowledgement); + + slave.get()->terminate(); + + Future<ReregisterExecutorMessage> reregisterExecutor = + FUTURE_PROTOBUF(ReregisterExecutorMessage(), _, _); + + Future<ReregisterSlaveMessage> reregisterSlave = + FUTURE_PROTOBUF(ReregisterSlaveMessage(), _, _); + + // Restart the slave (use same flags) with a new containerizer. + _containerizer = TypeParam::create(slaveFlags, true, &fetcher); + ASSERT_SOME(_containerizer); + containerizer.reset(_containerizer.get()); + + slave = this->StartSlave( + detector.get(), + containerizer.get(), + slavePid, + slaveFlags); + ASSERT_SOME(slave); + + // Ensure the executor re-registers. + AWAIT_READY(reregisterExecutor); + + Clock::pause(); + + // Advance the clock enough to ensure that the agent would have been marked + // unreachable if it was not responding to pings. + for (unsigned int i = 0; i < masterFlags.max_agent_ping_timeouts + 2; i++) { + Future<PingSlaveMessage> ping = FUTURE_PROTOBUF(PingSlaveMessage(), _, _); + Future<PongSlaveMessage> pong = FUTURE_PROTOBUF(PongSlaveMessage(), _, _); + + Clock::advance(masterFlags.agent_ping_timeout); + + AWAIT_READY(ping); + AWAIT_READY(pong); + } + + Clock::resume(); + + // Perform reconciliation to verify that the task has not been transitioned to + // TASK_LOST or TASK_UNREACHABLE, as would occur if the agent had been deemed + // unreachable. + vector<TaskStatus> statuses; + + TaskStatus status; + status.mutable_task_id()->CopyFrom(task.task_id()); + status.set_state(TASK_STAGING); // Dummy value. + + statuses.push_back(status); + + Future<TaskStatus> statusUpdate2; + EXPECT_CALL(sched, statusUpdate(_, _)) + .WillOnce(FutureArg<1>(&statusUpdate2)) + .WillRepeatedly(Return()); // Ignore subsequent updates. + + driver.reconcileTasks(statuses); + + AWAIT_READY(statusUpdate2); + ASSERT_EQ(TASK_RUNNING, statusUpdate2->state()); + ASSERT_EQ(TaskStatus::SOURCE_MASTER, statusUpdate2->source()); + ASSERT_EQ(TaskStatus::REASON_RECONCILIATION, statusUpdate2->reason()); + + // Ensure that the agent has not re-registered yet. + ASSERT_TRUE(reregisterSlave.isPending()); + + // Advance the clock to prompt the agent to re-register. + Clock::pause(); + Clock::advance(slaveFlags.executor_reregistration_timeout); + Clock::resume(); + + AWAIT_READY(reregisterSlave); + + driver.stop(); + driver.join(); +} + + // The slave is stopped before the HTTP based command executor is // registered. When it comes back up with recovery=reconnect, make // sure the executor is killed and the task is transitioned to LOST.