Added a test to verify the agent flag 'executor_reregistration_timeout'.

This patch adds a test to verify the correct behavior of
the agent flag 'executor_reregistration_timeout.

Review: https://reviews.apache.org/r/59545/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/9e5bd102
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/9e5bd102
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/9e5bd102

Branch: refs/heads/master
Commit: 9e5bd10217d37166957cc6a4ebca82bceb2f8d74
Parents: 2998026
Author: Greg Mann <g...@mesosphere.io>
Authored: Thu Jun 1 12:17:01 2017 -0700
Committer: Vinod Kone <vinodk...@gmail.com>
Committed: Thu Jun 1 12:17:01 2017 -0700

----------------------------------------------------------------------
 src/tests/slave_tests.cpp | 135 +++++++++++++++++++++++++++++++++++++++++
 1 file changed, 135 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/9e5bd102/src/tests/slave_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/slave_tests.cpp b/src/tests/slave_tests.cpp
index 927b9c3..2c3a7a6 100644
--- a/src/tests/slave_tests.cpp
+++ b/src/tests/slave_tests.cpp
@@ -7210,6 +7210,141 @@ TEST_F(SlaveTest, 
IgnoreV0ExecutorIfItReregistersWithoutReconnect)
   driver.join();
 }
 
+
+// This test verifies that the 'executor_reregistration_timeout' agent flag
+// successfully extends the timeout within which an executor can re-register.
+TEST_F(SlaveTest, ExecutorReregistrationTimeoutFlag)
+{
+  Clock::pause();
+
+  master::Flags masterFlags = this->CreateMasterFlags();
+
+  Try<Owned<cluster::Master>> master = this->StartMaster(masterFlags);
+  ASSERT_SOME(master);
+
+  // Set the executor re-register timeout to a value greater than the default.
+  slave::Flags slaveFlags = this->CreateSlaveFlags();
+  slaveFlags.executor_reregistration_timeout = Seconds(15);
+
+  Fetcher fetcher(slaveFlags);
+
+  Try<MesosContainerizer*> _containerizer =
+    MesosContainerizer::create(slaveFlags, true, &fetcher);
+  ASSERT_SOME(_containerizer);
+  Owned<slave::Containerizer> containerizer(_containerizer.get());
+
+  Owned<MasterDetector> detector = master.get()->createDetector();
+
+  Try<Owned<cluster::Slave>> slave =
+    this->StartSlave(detector.get(), containerizer.get(), slaveFlags);
+  ASSERT_SOME(slave);
+
+  // Enable checkpointing for the framework.
+  FrameworkInfo frameworkInfo = DEFAULT_FRAMEWORK_INFO;
+  frameworkInfo.set_checkpoint(true);
+
+  MockScheduler sched;
+  MesosSchedulerDriver driver(
+      &sched, frameworkInfo, master.get()->pid, false, DEFAULT_CREDENTIAL);
+
+  EXPECT_CALL(sched, registered(_, _, _));
+
+  Future<vector<Offer>> offers;
+  EXPECT_CALL(sched, resourceOffers(_, _))
+    .WillOnce(FutureArg<1>(&offers))
+    .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+  driver.start();
+
+  Clock::advance(masterFlags.allocation_interval);
+  Clock::advance(slaveFlags.registration_backoff_factor);
+
+  AWAIT_READY(offers);
+  EXPECT_NE(0u, offers->size());
+
+  TaskInfo task = createTask(offers.get()[0], "sleep 1000");
+
+  Future<TaskStatus> statusUpdate1;
+  EXPECT_CALL(sched, statusUpdate(&driver, _))
+    .WillOnce(FutureArg<1>(&statusUpdate1));
+
+  driver.launchTasks(offers.get()[0].id(), {task});
+
+  AWAIT_READY(statusUpdate1);
+  ASSERT_EQ(TASK_RUNNING, statusUpdate1->state());
+
+  Future<Nothing> _statusUpdateAcknowledgement =
+    FUTURE_DISPATCH(_, &Slave::_statusUpdateAcknowledgement);
+
+  driver.acknowledgeStatusUpdate(statusUpdate1.get());
+
+  AWAIT_READY(_statusUpdateAcknowledgement);
+
+  slave.get()->terminate();
+
+  Future<ReregisterExecutorMessage> reregisterExecutor =
+    DROP_PROTOBUF(ReregisterExecutorMessage(), _, _);
+
+  Future<SlaveReregisteredMessage> slaveReregistered =
+    FUTURE_PROTOBUF(SlaveReregisteredMessage(), _, _);
+
+  // Restart the slave (use same flags) with a new containerizer.
+  _containerizer = MesosContainerizer::create(slaveFlags, true, &fetcher);
+  ASSERT_SOME(_containerizer);
+  containerizer.reset(_containerizer.get());
+
+  slave = this->StartSlave(detector.get(), containerizer.get(), slaveFlags);
+  ASSERT_SOME(slave);
+
+  // Ensure that the executor attempts to re-register, so that we can capture
+  // its re-registration message.
+  AWAIT_READY(reregisterExecutor);
+
+  // Make sure that we're advancing the clock more than the default timeout.
+  ASSERT_TRUE(
+      slaveFlags.executor_reregistration_timeout * 0.9 >
+      slave::EXECUTOR_REREGISTRATION_TIMEOUT);
+  Clock::advance(slaveFlags.executor_reregistration_timeout * 0.9);
+
+  // Send the executor's delayed re-registration message.
+  process::post(slave.get()->pid, reregisterExecutor.get());
+
+  // Advance the clock to prompt the agent to re-register, and ensure that the
+  // executor's task would have been marked unreachable if the executor had not
+  // re-registered successfully.
+  Clock::advance(slaveFlags.executor_reregistration_timeout * 0.2);
+
+  Clock::resume();
+
+  AWAIT_READY(slaveReregistered);
+
+  // Perform reconciliation to verify that the task has not been transitioned 
to
+  // TASK_LOST or TASK_UNREACHABLE, as would occur if the agent had been deemed
+  // unreachable.
+  vector<TaskStatus> statuses;
+
+  TaskStatus status;
+  status.mutable_task_id()->CopyFrom(task.task_id());
+  status.set_state(TASK_STAGING); // Dummy value.
+
+  statuses.push_back(status);
+
+  Future<TaskStatus> statusUpdate2;
+  EXPECT_CALL(sched, statusUpdate(_, _))
+    .WillOnce(FutureArg<1>(&statusUpdate2))
+    .WillRepeatedly(Return()); // Ignore subsequent updates.
+
+  driver.reconcileTasks(statuses);
+
+  AWAIT_READY(statusUpdate2);
+  ASSERT_EQ(TASK_RUNNING, statusUpdate2->state());
+  ASSERT_EQ(TaskStatus::SOURCE_MASTER, statusUpdate2->source());
+  ASSERT_EQ(TaskStatus::REASON_RECONCILIATION, statusUpdate2->reason());
+
+  driver.stop();
+  driver.join();
+}
+
 } // namespace tests {
 } // namespace internal {
 } // namespace mesos {

Reply via email to