Repository: mesos Updated Branches: refs/heads/master 6b1aa0084 -> a064505e4
Added test to simulate slow/unresponsive fetch. Added test to simulate the scenario of slow/unresponsive HDFS leading to executor register timeout and verify that slave gets notified of the failure. Review: https://reviews.apache.org/r/50000/ Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/a064505e Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/a064505e Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/a064505e Branch: refs/heads/master Commit: a064505e411fe78a257e9b336a888f1eeddaa949 Parents: 6b1aa00 Author: Megha Sharma <mshar...@apple.com> Authored: Mon Aug 22 14:51:07 2016 -0700 Committer: Jiang Yan Xu <xuj...@apple.com> Committed: Tue Aug 23 09:42:31 2016 -0700 ---------------------------------------------------------------------- src/tests/slave_tests.cpp | 129 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 129 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/a064505e/src/tests/slave_tests.cpp ---------------------------------------------------------------------- diff --git a/src/tests/slave_tests.cpp b/src/tests/slave_tests.cpp index 30ca3da..dcf8454 100644 --- a/src/tests/slave_tests.cpp +++ b/src/tests/slave_tests.cpp @@ -263,6 +263,135 @@ TEST_F(SlaveTest, ShutdownUnregisteredExecutor) } +// This test verifies that mesos agent gets notified of task +// launch failure triggered by the executor register timeout +// caused by slow URI fetching. +TEST_F(SlaveTest, ExecutorTimeoutCausedBySlowFetch) +{ + Try<Owned<cluster::Master>> master = StartMaster(); + ASSERT_SOME(master); + + string hadoopPath = os::getcwd(); + string hadoopBinPath = path::join(hadoopPath, "bin"); + + ASSERT_SOME(os::mkdir(hadoopBinPath)); + ASSERT_SOME(os::chmod(hadoopBinPath, S_IRWXU | S_IRWXG | S_IRWXO)); + + // A spurious "hadoop" script that sleeps forever. + string mockHadoopScript = "#!/usr/bin/env bash\n" + "sleep 1000"; + + string hadoopCommand = path::join(hadoopBinPath, "hadoop"); + ASSERT_SOME(os::write(hadoopCommand, mockHadoopScript)); + ASSERT_SOME(os::chmod(hadoopCommand, + S_IRWXU | S_IRGRP | S_IXGRP | S_IROTH | S_IXOTH)); + + slave::Flags flags = CreateSlaveFlags(); + flags.hadoop_home = hadoopPath; + + Fetcher fetcher; + + Try<MesosContainerizer*> _containerizer = MesosContainerizer::create( + flags, true, &fetcher); + + CHECK_SOME(_containerizer); + Owned<MesosContainerizer> containerizer(_containerizer.get()); + + Owned<MasterDetector> detector = master.get()->createDetector(); + Try<Owned<cluster::Slave>> slave = StartSlave( + detector.get(), + containerizer.get(), + flags); + ASSERT_SOME(slave); + + MockScheduler sched; + MesosSchedulerDriver driver( + &sched, + DEFAULT_FRAMEWORK_INFO, + master.get()->pid, + DEFAULT_CREDENTIAL); + + EXPECT_CALL(sched, registered(&driver, _, _)) + .Times(1); + + Future<vector<Offer>> offers; + EXPECT_CALL(sched, resourceOffers(&driver, _)) + .WillOnce(FutureArg<1>(&offers)) + .WillRepeatedly(Return()); // Ignore subsequent offers. + + driver.start(); + + AWAIT_READY(offers); + EXPECT_NE(0u, offers.get().size()); + + // Launch a task with the command executor. + // The task uses a URI that needs to be fetched by the HDFS client + // and will be blocked until the executor registrartion times out. + CommandInfo commandInfo; + CommandInfo::URI* uri = commandInfo.add_uris(); + uri->set_value(path::join("hdfs://dummyhost/dummypath", "test")); + + // Using a dummy command value as it's a required field. The + // command won't be invoked. + commandInfo.set_value("sleep 10"); + + ExecutorID executorId; + executorId.set_value("test-executor-staging"); + + TaskInfo task = createTask( + offers.get()[0].slave_id(), + offers.get()[0].resources(), + commandInfo, + executorId, + "test-task-staging"); + + Future<Nothing> fetch = FUTURE_DISPATCH( + _, &FetcherProcess::fetch); + + Future<TaskStatus> status; + EXPECT_CALL(sched, statusUpdate(&driver, _)) + .WillOnce(FutureArg<1>(&status)); + + Clock::pause(); + + driver.launchTasks(offers.get()[0].id(), {task}); + + Future<Nothing> executorLost; + EXPECT_CALL(sched, executorLost(&driver, executorId, _, _)) + .WillOnce(FutureSatisfy(&executorLost)); + + // Ensure that the slave times out and kills the executor. + Future<Nothing> destroyExecutor = FUTURE_DISPATCH( + _, &MesosContainerizerProcess::destroy); + + AWAIT_READY(fetch); + + Clock::advance(flags.executor_registration_timeout); + + AWAIT_READY(destroyExecutor); + + Clock::settle(); // Wait for Containerizer::destroy to complete. + + // Now advance time until the reaper reaps the executor. + while (status.isPending()) { + Clock::advance(process::MAX_REAP_INTERVAL()); + Clock::settle(); + } + + AWAIT_READY(executorLost); + + AWAIT_READY(status); + ASSERT_EQ(TASK_FAILED, status->state()); + EXPECT_EQ(TaskStatus::SOURCE_SLAVE, status->source()); + EXPECT_EQ(TaskStatus::REASON_CONTAINER_LAUNCH_FAILED, status->reason()); + + Clock::resume(); + + driver.stop(); + driver.join(); +} + + // This test verifies that when an executor terminates before // registering with slave, it is properly cleaned up. TEST_F(SlaveTest, RemoveUnregisteredTerminatedExecutor)