Repository: mesos
Updated Branches:
  refs/heads/master 1afc364c0 -> f6c2fdb65


Added test for agent ping timeout during agent recovery.

This patch adds a new test,
`SlaveRecoveryTest.PingTimeoutDuringRecovery`, which verifies
that the agent will reply to pings from the master while it
is performing recovery.

Review: https://reviews.apache.org/r/59463/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/2998026b
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/2998026b
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/2998026b

Branch: refs/heads/master
Commit: 2998026bb920924906fc1d4ddabc81397f8cd452
Parents: 1afc364
Author: Greg Mann <g...@mesosphere.io>
Authored: Thu Jun 1 12:16:56 2017 -0700
Committer: Vinod Kone <vinodk...@gmail.com>
Committed: Thu Jun 1 12:16:56 2017 -0700

----------------------------------------------------------------------
 src/tests/slave_recovery_tests.cpp | 151 ++++++++++++++++++++++++++++++++
 1 file changed, 151 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/2998026b/src/tests/slave_recovery_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/slave_recovery_tests.cpp 
b/src/tests/slave_recovery_tests.cpp
index e140f4d..86cf971 100644
--- a/src/tests/slave_recovery_tests.cpp
+++ b/src/tests/slave_recovery_tests.cpp
@@ -913,6 +913,157 @@ TYPED_TEST(SlaveRecoveryTest, ReconnectExecutorRetry)
 }
 
 
+// This test verifies that the agent responds to pings from the master while 
the
+// agent is performing recovery. We do this by setting the executor
+// re-registration timeout to a duration longer than (agent_ping_timeout *
+// (max_agent_ping_timeouts + 1)), and then confirming that the agent is not
+// marked unreachable after the max ping timeout has elapsed, even if all
+// executors have re-registered. Agent recovery currently does not complete
+// until the executor re-registration timeout has elapsed (see MESOS-7539).
+TYPED_TEST(SlaveRecoveryTest, PingTimeoutDuringRecovery)
+{
+  master::Flags masterFlags = this->CreateMasterFlags();
+  masterFlags.agent_ping_timeout = Seconds(1);
+  masterFlags.max_agent_ping_timeouts = 2;
+
+  Try<Owned<cluster::Master>> master = this->StartMaster(masterFlags);
+  ASSERT_SOME(master);
+
+  // Set the executor re-register timeout to a value greater than
+  // (agent_ping_timeout * (max_agent_ping_timeouts + 1)).
+  slave::Flags slaveFlags = this->CreateSlaveFlags();
+  slaveFlags.executor_reregistration_timeout = Seconds(15);
+
+  Fetcher fetcher(slaveFlags);
+
+  Try<TypeParam*> _containerizer =
+    TypeParam::create(slaveFlags, true, &fetcher);
+  ASSERT_SOME(_containerizer);
+  Owned<slave::Containerizer> containerizer(_containerizer.get());
+
+  Owned<MasterDetector> detector = master.get()->createDetector();
+
+  const string slavePid = "slave";
+
+  Try<Owned<cluster::Slave>> slave =
+    this->StartSlave(detector.get(), containerizer.get(), slavePid, 
slaveFlags);
+  ASSERT_SOME(slave);
+
+  // Enable checkpointing for the framework.
+  FrameworkInfo frameworkInfo = DEFAULT_FRAMEWORK_INFO;
+  frameworkInfo.set_checkpoint(true);
+
+  MockScheduler sched;
+  MesosSchedulerDriver driver(
+      &sched, frameworkInfo, master.get()->pid, false, DEFAULT_CREDENTIAL);
+
+  EXPECT_CALL(sched, registered(_, _, _));
+
+  Future<vector<Offer>> offers;
+  EXPECT_CALL(sched, resourceOffers(_, _))
+    .WillOnce(FutureArg<1>(&offers))
+    .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+  driver.start();
+
+  AWAIT_READY(offers);
+  EXPECT_NE(0u, offers->size());
+
+  TaskInfo task = createTask(offers.get()[0], "sleep 1000");
+
+  Future<TaskStatus> statusUpdate1;
+  EXPECT_CALL(sched, statusUpdate(&driver, _))
+    .WillOnce(FutureArg<1>(&statusUpdate1));
+
+  driver.launchTasks(offers.get()[0].id(), {task});
+
+  AWAIT_READY(statusUpdate1);
+  ASSERT_EQ(TASK_RUNNING, statusUpdate1->state());
+
+  Future<Nothing> _statusUpdateAcknowledgement =
+    FUTURE_DISPATCH(_, &Slave::_statusUpdateAcknowledgement);
+
+  driver.acknowledgeStatusUpdate(statusUpdate1.get());
+
+  AWAIT_READY(_statusUpdateAcknowledgement);
+
+  slave.get()->terminate();
+
+  Future<ReregisterExecutorMessage> reregisterExecutor =
+    FUTURE_PROTOBUF(ReregisterExecutorMessage(), _, _);
+
+  Future<ReregisterSlaveMessage> reregisterSlave =
+    FUTURE_PROTOBUF(ReregisterSlaveMessage(), _, _);
+
+  // Restart the slave (use same flags) with a new containerizer.
+  _containerizer = TypeParam::create(slaveFlags, true, &fetcher);
+  ASSERT_SOME(_containerizer);
+  containerizer.reset(_containerizer.get());
+
+  slave = this->StartSlave(
+      detector.get(),
+      containerizer.get(),
+      slavePid,
+      slaveFlags);
+  ASSERT_SOME(slave);
+
+  // Ensure the executor re-registers.
+  AWAIT_READY(reregisterExecutor);
+
+  Clock::pause();
+
+  // Advance the clock enough to ensure that the agent would have been marked
+  // unreachable if it was not responding to pings.
+  for (unsigned int i = 0; i < masterFlags.max_agent_ping_timeouts + 2; i++) {
+    Future<PingSlaveMessage> ping = FUTURE_PROTOBUF(PingSlaveMessage(), _, _);
+    Future<PongSlaveMessage> pong = FUTURE_PROTOBUF(PongSlaveMessage(), _, _);
+
+    Clock::advance(masterFlags.agent_ping_timeout);
+
+    AWAIT_READY(ping);
+    AWAIT_READY(pong);
+  }
+
+  Clock::resume();
+
+  // Perform reconciliation to verify that the task has not been transitioned 
to
+  // TASK_LOST or TASK_UNREACHABLE, as would occur if the agent had been deemed
+  // unreachable.
+  vector<TaskStatus> statuses;
+
+  TaskStatus status;
+  status.mutable_task_id()->CopyFrom(task.task_id());
+  status.set_state(TASK_STAGING); // Dummy value.
+
+  statuses.push_back(status);
+
+  Future<TaskStatus> statusUpdate2;
+  EXPECT_CALL(sched, statusUpdate(_, _))
+    .WillOnce(FutureArg<1>(&statusUpdate2))
+    .WillRepeatedly(Return()); // Ignore subsequent updates.
+
+  driver.reconcileTasks(statuses);
+
+  AWAIT_READY(statusUpdate2);
+  ASSERT_EQ(TASK_RUNNING, statusUpdate2->state());
+  ASSERT_EQ(TaskStatus::SOURCE_MASTER, statusUpdate2->source());
+  ASSERT_EQ(TaskStatus::REASON_RECONCILIATION, statusUpdate2->reason());
+
+  // Ensure that the agent has not re-registered yet.
+  ASSERT_TRUE(reregisterSlave.isPending());
+
+  // Advance the clock to prompt the agent to re-register.
+  Clock::pause();
+  Clock::advance(slaveFlags.executor_reregistration_timeout);
+  Clock::resume();
+
+  AWAIT_READY(reregisterSlave);
+
+  driver.stop();
+  driver.join();
+}
+
+
 // The slave is stopped before the HTTP based command executor is
 // registered. When it comes back up with recovery=reconnect, make
 // sure the executor is killed and the task is transitioned to LOST.

Reply via email to