Added a test to verify the agent flag 'executor_reregistration_timeout'. This patch adds a test to verify the correct behavior of the agent flag 'executor_reregistration_timeout.
Review: https://reviews.apache.org/r/59545/ Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/9e5bd102 Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/9e5bd102 Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/9e5bd102 Branch: refs/heads/master Commit: 9e5bd10217d37166957cc6a4ebca82bceb2f8d74 Parents: 2998026 Author: Greg Mann <g...@mesosphere.io> Authored: Thu Jun 1 12:17:01 2017 -0700 Committer: Vinod Kone <vinodk...@gmail.com> Committed: Thu Jun 1 12:17:01 2017 -0700 ---------------------------------------------------------------------- src/tests/slave_tests.cpp | 135 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 135 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/9e5bd102/src/tests/slave_tests.cpp ---------------------------------------------------------------------- diff --git a/src/tests/slave_tests.cpp b/src/tests/slave_tests.cpp index 927b9c3..2c3a7a6 100644 --- a/src/tests/slave_tests.cpp +++ b/src/tests/slave_tests.cpp @@ -7210,6 +7210,141 @@ TEST_F(SlaveTest, IgnoreV0ExecutorIfItReregistersWithoutReconnect) driver.join(); } + +// This test verifies that the 'executor_reregistration_timeout' agent flag +// successfully extends the timeout within which an executor can re-register. +TEST_F(SlaveTest, ExecutorReregistrationTimeoutFlag) +{ + Clock::pause(); + + master::Flags masterFlags = this->CreateMasterFlags(); + + Try<Owned<cluster::Master>> master = this->StartMaster(masterFlags); + ASSERT_SOME(master); + + // Set the executor re-register timeout to a value greater than the default. + slave::Flags slaveFlags = this->CreateSlaveFlags(); + slaveFlags.executor_reregistration_timeout = Seconds(15); + + Fetcher fetcher(slaveFlags); + + Try<MesosContainerizer*> _containerizer = + MesosContainerizer::create(slaveFlags, true, &fetcher); + ASSERT_SOME(_containerizer); + Owned<slave::Containerizer> containerizer(_containerizer.get()); + + Owned<MasterDetector> detector = master.get()->createDetector(); + + Try<Owned<cluster::Slave>> slave = + this->StartSlave(detector.get(), containerizer.get(), slaveFlags); + ASSERT_SOME(slave); + + // Enable checkpointing for the framework. + FrameworkInfo frameworkInfo = DEFAULT_FRAMEWORK_INFO; + frameworkInfo.set_checkpoint(true); + + MockScheduler sched; + MesosSchedulerDriver driver( + &sched, frameworkInfo, master.get()->pid, false, DEFAULT_CREDENTIAL); + + EXPECT_CALL(sched, registered(_, _, _)); + + Future<vector<Offer>> offers; + EXPECT_CALL(sched, resourceOffers(_, _)) + .WillOnce(FutureArg<1>(&offers)) + .WillRepeatedly(Return()); // Ignore subsequent offers. + + driver.start(); + + Clock::advance(masterFlags.allocation_interval); + Clock::advance(slaveFlags.registration_backoff_factor); + + AWAIT_READY(offers); + EXPECT_NE(0u, offers->size()); + + TaskInfo task = createTask(offers.get()[0], "sleep 1000"); + + Future<TaskStatus> statusUpdate1; + EXPECT_CALL(sched, statusUpdate(&driver, _)) + .WillOnce(FutureArg<1>(&statusUpdate1)); + + driver.launchTasks(offers.get()[0].id(), {task}); + + AWAIT_READY(statusUpdate1); + ASSERT_EQ(TASK_RUNNING, statusUpdate1->state()); + + Future<Nothing> _statusUpdateAcknowledgement = + FUTURE_DISPATCH(_, &Slave::_statusUpdateAcknowledgement); + + driver.acknowledgeStatusUpdate(statusUpdate1.get()); + + AWAIT_READY(_statusUpdateAcknowledgement); + + slave.get()->terminate(); + + Future<ReregisterExecutorMessage> reregisterExecutor = + DROP_PROTOBUF(ReregisterExecutorMessage(), _, _); + + Future<SlaveReregisteredMessage> slaveReregistered = + FUTURE_PROTOBUF(SlaveReregisteredMessage(), _, _); + + // Restart the slave (use same flags) with a new containerizer. + _containerizer = MesosContainerizer::create(slaveFlags, true, &fetcher); + ASSERT_SOME(_containerizer); + containerizer.reset(_containerizer.get()); + + slave = this->StartSlave(detector.get(), containerizer.get(), slaveFlags); + ASSERT_SOME(slave); + + // Ensure that the executor attempts to re-register, so that we can capture + // its re-registration message. + AWAIT_READY(reregisterExecutor); + + // Make sure that we're advancing the clock more than the default timeout. + ASSERT_TRUE( + slaveFlags.executor_reregistration_timeout * 0.9 > + slave::EXECUTOR_REREGISTRATION_TIMEOUT); + Clock::advance(slaveFlags.executor_reregistration_timeout * 0.9); + + // Send the executor's delayed re-registration message. + process::post(slave.get()->pid, reregisterExecutor.get()); + + // Advance the clock to prompt the agent to re-register, and ensure that the + // executor's task would have been marked unreachable if the executor had not + // re-registered successfully. + Clock::advance(slaveFlags.executor_reregistration_timeout * 0.2); + + Clock::resume(); + + AWAIT_READY(slaveReregistered); + + // Perform reconciliation to verify that the task has not been transitioned to + // TASK_LOST or TASK_UNREACHABLE, as would occur if the agent had been deemed + // unreachable. + vector<TaskStatus> statuses; + + TaskStatus status; + status.mutable_task_id()->CopyFrom(task.task_id()); + status.set_state(TASK_STAGING); // Dummy value. + + statuses.push_back(status); + + Future<TaskStatus> statusUpdate2; + EXPECT_CALL(sched, statusUpdate(_, _)) + .WillOnce(FutureArg<1>(&statusUpdate2)) + .WillRepeatedly(Return()); // Ignore subsequent updates. + + driver.reconcileTasks(statuses); + + AWAIT_READY(statusUpdate2); + ASSERT_EQ(TASK_RUNNING, statusUpdate2->state()); + ASSERT_EQ(TaskStatus::SOURCE_MASTER, statusUpdate2->source()); + ASSERT_EQ(TaskStatus::REASON_RECONCILIATION, statusUpdate2->reason()); + + driver.stop(); + driver.join(); +} + } // namespace tests { } // namespace internal { } // namespace mesos {