Added a test to verify executor driver message dropping behavior.

This patch adds a test which verifies that the executor driver will drop
RunTaskMessages when it is not connected to the agent.

Review: https://reviews.apache.org/r/59583/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/f6c2fdb6
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/f6c2fdb6
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/f6c2fdb6

Branch: refs/heads/master
Commit: f6c2fdb65703845b7048aeb5ad0ec78ffbe3a5ab
Parents: 9e5bd10
Author: Greg Mann <g...@mesosphere.io>
Authored: Thu Jun 1 12:17:08 2017 -0700
Committer: Vinod Kone <vinodk...@gmail.com>
Committed: Thu Jun 1 12:17:08 2017 -0700

----------------------------------------------------------------------
 src/tests/slave_tests.cpp | 127 +++++++++++++++++++++++++++++++++++++++++
 1 file changed, 127 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/f6c2fdb6/src/tests/slave_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/slave_tests.cpp b/src/tests/slave_tests.cpp
index 2c3a7a6..0bab752 100644
--- a/src/tests/slave_tests.cpp
+++ b/src/tests/slave_tests.cpp
@@ -7211,6 +7211,133 @@ TEST_F(SlaveTest, 
IgnoreV0ExecutorIfItReregistersWithoutReconnect)
 }
 
 
+// This test verifies that a disconnected PID-based executor will drop
+// RunTaskMessages.
+TEST_F(SlaveTest, DisconnectedExecutorDropsMessages)
+{
+  Clock::pause();
+
+  master::Flags masterFlags = this->CreateMasterFlags();
+
+  Try<Owned<cluster::Master>> master = this->StartMaster(masterFlags);
+  ASSERT_SOME(master);
+
+  slave::Flags slaveFlags = this->CreateSlaveFlags();
+
+  MockExecutor exec(DEFAULT_EXECUTOR_ID);
+  TestContainerizer containerizer(&exec);
+
+  Owned<MasterDetector> detector = master.get()->createDetector();
+
+  Try<Owned<cluster::Slave>> slave =
+    this->StartSlave(detector.get(), &containerizer, slaveFlags);
+  ASSERT_SOME(slave);
+
+  // Enable checkpointing for the framework so that the executor continues
+  // running after agent termination.
+  FrameworkInfo frameworkInfo = DEFAULT_FRAMEWORK_INFO;
+  frameworkInfo.set_checkpoint(true);
+
+  MockScheduler sched;
+  MesosSchedulerDriver driver(
+      &sched, frameworkInfo, master.get()->pid, false, DEFAULT_CREDENTIAL);
+
+  EXPECT_CALL(sched, registered(_, _, _));
+
+  Future<vector<Offer>> offers;
+  EXPECT_CALL(sched, resourceOffers(_, _))
+    .WillOnce(FutureArg<1>(&offers))
+    .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+  driver.start();
+
+  Clock::advance(masterFlags.allocation_interval);
+  Clock::advance(slaveFlags.registration_backoff_factor);
+
+  AWAIT_READY(offers);
+  EXPECT_NE(0u, offers->size());
+
+  FrameworkID frameworkId = offers.get()[0].framework_id();
+
+  TaskInfo runningTask =
+    createTask(offers.get()[0], "sleep 1000", DEFAULT_EXECUTOR_ID);
+
+  // Capture the executor registration message to get the executor's pid.
+  Future<Message> registerExecutor =
+    FUTURE_MESSAGE(Eq(RegisterExecutorMessage().GetTypeName()), _, _);
+
+  EXPECT_CALL(exec, registered(_, _, _, _));
+
+  // Capture the `RunTaskMessage` so that we can use the framework pid to spoof
+  // another `RunTaskMessage` later.
+  Future<RunTaskMessage> capturedRunTaskMessage =
+    FUTURE_PROTOBUF(RunTaskMessage(), master.get()->pid, slave.get()->pid);
+
+  // In addition to returning the expected task status here, this expectation
+  // will also ensure that the spoofed `RunTaskMessage` we send later does not
+  // trigger a call to `launchTask`.
+  EXPECT_CALL(exec, launchTask(_, _))
+    .WillOnce(SendStatusUpdateFromTask(TASK_RUNNING));
+
+  Future<TaskStatus> statusUpdate;
+  EXPECT_CALL(sched, statusUpdate(&driver, _))
+    .WillOnce(FutureArg<1>(&statusUpdate));
+
+  driver.launchTasks(offers.get()[0].id(), {runningTask});
+
+  AWAIT_READY(registerExecutor);
+  UPID executorPid = registerExecutor->from;
+
+  AWAIT_READY(capturedRunTaskMessage);
+
+  AWAIT_READY(statusUpdate);
+  ASSERT_EQ(TASK_RUNNING, statusUpdate->state());
+
+  Future<Nothing> _statusUpdateAcknowledgement =
+    FUTURE_DISPATCH(_, &Slave::_statusUpdateAcknowledgement);
+
+  driver.acknowledgeStatusUpdate(statusUpdate.get());
+
+  AWAIT_READY(_statusUpdateAcknowledgement);
+
+  // Ensure that the executor continues running after agent termination.
+  EXPECT_CALL(exec, shutdown(_))
+    .Times(0);
+
+  // Terminate the agent so that the executor becomes disconnected.
+  slave.get()->terminate();
+
+  Clock::settle();
+
+  TaskInfo droppedTask =
+    createTask(offers.get()[0], "sleep 1000", DEFAULT_EXECUTOR_ID);
+
+  RunTaskMessage runTaskMessage;
+  runTaskMessage.mutable_framework_id()->CopyFrom(frameworkId);
+  runTaskMessage.mutable_framework()->CopyFrom(frameworkInfo);
+  runTaskMessage.mutable_task()->CopyFrom(droppedTask);
+  runTaskMessage.set_pid(capturedRunTaskMessage->pid());
+
+  // Send the executor a `RunTaskMessage` while it's disconnected.
+  // This message should be dropped.
+  process::post(executorPid, runTaskMessage);
+
+  // Settle the clock to ensure that the `RunTaskMessage` is processed. If it 
is
+  // not ignored, the test would fail due to a violation of the expectation we
+  // previously registered on `Executor::launchTask`.
+  Clock::settle();
+
+  // Executor may call shutdown during test teardown.
+  EXPECT_CALL(exec, shutdown(_))
+    .Times(AtMost(1));
+
+  driver.stop();
+  driver.join();
+
+  Clock::resume();
+}
+
+
 // This test verifies that the 'executor_reregistration_timeout' agent flag
 // successfully extends the timeout within which an executor can re-register.
 TEST_F(SlaveTest, ExecutorReregistrationTimeoutFlag)

Reply via email to