Added a test to verify executor driver message dropping behavior. This patch adds a test which verifies that the executor driver will drop RunTaskMessages when it is not connected to the agent.
Review: https://reviews.apache.org/r/59583/ Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/f6c2fdb6 Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/f6c2fdb6 Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/f6c2fdb6 Branch: refs/heads/master Commit: f6c2fdb65703845b7048aeb5ad0ec78ffbe3a5ab Parents: 9e5bd10 Author: Greg Mann <g...@mesosphere.io> Authored: Thu Jun 1 12:17:08 2017 -0700 Committer: Vinod Kone <vinodk...@gmail.com> Committed: Thu Jun 1 12:17:08 2017 -0700 ---------------------------------------------------------------------- src/tests/slave_tests.cpp | 127 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 127 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/f6c2fdb6/src/tests/slave_tests.cpp ---------------------------------------------------------------------- diff --git a/src/tests/slave_tests.cpp b/src/tests/slave_tests.cpp index 2c3a7a6..0bab752 100644 --- a/src/tests/slave_tests.cpp +++ b/src/tests/slave_tests.cpp @@ -7211,6 +7211,133 @@ TEST_F(SlaveTest, IgnoreV0ExecutorIfItReregistersWithoutReconnect) } +// This test verifies that a disconnected PID-based executor will drop +// RunTaskMessages. +TEST_F(SlaveTest, DisconnectedExecutorDropsMessages) +{ + Clock::pause(); + + master::Flags masterFlags = this->CreateMasterFlags(); + + Try<Owned<cluster::Master>> master = this->StartMaster(masterFlags); + ASSERT_SOME(master); + + slave::Flags slaveFlags = this->CreateSlaveFlags(); + + MockExecutor exec(DEFAULT_EXECUTOR_ID); + TestContainerizer containerizer(&exec); + + Owned<MasterDetector> detector = master.get()->createDetector(); + + Try<Owned<cluster::Slave>> slave = + this->StartSlave(detector.get(), &containerizer, slaveFlags); + ASSERT_SOME(slave); + + // Enable checkpointing for the framework so that the executor continues + // running after agent termination. + FrameworkInfo frameworkInfo = DEFAULT_FRAMEWORK_INFO; + frameworkInfo.set_checkpoint(true); + + MockScheduler sched; + MesosSchedulerDriver driver( + &sched, frameworkInfo, master.get()->pid, false, DEFAULT_CREDENTIAL); + + EXPECT_CALL(sched, registered(_, _, _)); + + Future<vector<Offer>> offers; + EXPECT_CALL(sched, resourceOffers(_, _)) + .WillOnce(FutureArg<1>(&offers)) + .WillRepeatedly(Return()); // Ignore subsequent offers. + + driver.start(); + + Clock::advance(masterFlags.allocation_interval); + Clock::advance(slaveFlags.registration_backoff_factor); + + AWAIT_READY(offers); + EXPECT_NE(0u, offers->size()); + + FrameworkID frameworkId = offers.get()[0].framework_id(); + + TaskInfo runningTask = + createTask(offers.get()[0], "sleep 1000", DEFAULT_EXECUTOR_ID); + + // Capture the executor registration message to get the executor's pid. + Future<Message> registerExecutor = + FUTURE_MESSAGE(Eq(RegisterExecutorMessage().GetTypeName()), _, _); + + EXPECT_CALL(exec, registered(_, _, _, _)); + + // Capture the `RunTaskMessage` so that we can use the framework pid to spoof + // another `RunTaskMessage` later. + Future<RunTaskMessage> capturedRunTaskMessage = + FUTURE_PROTOBUF(RunTaskMessage(), master.get()->pid, slave.get()->pid); + + // In addition to returning the expected task status here, this expectation + // will also ensure that the spoofed `RunTaskMessage` we send later does not + // trigger a call to `launchTask`. + EXPECT_CALL(exec, launchTask(_, _)) + .WillOnce(SendStatusUpdateFromTask(TASK_RUNNING)); + + Future<TaskStatus> statusUpdate; + EXPECT_CALL(sched, statusUpdate(&driver, _)) + .WillOnce(FutureArg<1>(&statusUpdate)); + + driver.launchTasks(offers.get()[0].id(), {runningTask}); + + AWAIT_READY(registerExecutor); + UPID executorPid = registerExecutor->from; + + AWAIT_READY(capturedRunTaskMessage); + + AWAIT_READY(statusUpdate); + ASSERT_EQ(TASK_RUNNING, statusUpdate->state()); + + Future<Nothing> _statusUpdateAcknowledgement = + FUTURE_DISPATCH(_, &Slave::_statusUpdateAcknowledgement); + + driver.acknowledgeStatusUpdate(statusUpdate.get()); + + AWAIT_READY(_statusUpdateAcknowledgement); + + // Ensure that the executor continues running after agent termination. + EXPECT_CALL(exec, shutdown(_)) + .Times(0); + + // Terminate the agent so that the executor becomes disconnected. + slave.get()->terminate(); + + Clock::settle(); + + TaskInfo droppedTask = + createTask(offers.get()[0], "sleep 1000", DEFAULT_EXECUTOR_ID); + + RunTaskMessage runTaskMessage; + runTaskMessage.mutable_framework_id()->CopyFrom(frameworkId); + runTaskMessage.mutable_framework()->CopyFrom(frameworkInfo); + runTaskMessage.mutable_task()->CopyFrom(droppedTask); + runTaskMessage.set_pid(capturedRunTaskMessage->pid()); + + // Send the executor a `RunTaskMessage` while it's disconnected. + // This message should be dropped. + process::post(executorPid, runTaskMessage); + + // Settle the clock to ensure that the `RunTaskMessage` is processed. If it is + // not ignored, the test would fail due to a violation of the expectation we + // previously registered on `Executor::launchTask`. + Clock::settle(); + + // Executor may call shutdown during test teardown. + EXPECT_CALL(exec, shutdown(_)) + .Times(AtMost(1)); + + driver.stop(); + driver.join(); + + Clock::resume(); +} + + // This test verifies that the 'executor_reregistration_timeout' agent flag // successfully extends the timeout within which an executor can re-register. TEST_F(SlaveTest, ExecutorReregistrationTimeoutFlag)