Repository: mesos
Updated Branches:
  refs/heads/master 6b1aa0084 -> a064505e4


Added test to simulate slow/unresponsive fetch.

Added test to simulate the scenario of slow/unresponsive HDFS leading
to executor register timeout and verify that slave gets notified of the
failure.

Review: https://reviews.apache.org/r/50000/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/a064505e
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/a064505e
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/a064505e

Branch: refs/heads/master
Commit: a064505e411fe78a257e9b336a888f1eeddaa949
Parents: 6b1aa00
Author: Megha Sharma <mshar...@apple.com>
Authored: Mon Aug 22 14:51:07 2016 -0700
Committer: Jiang Yan Xu <xuj...@apple.com>
Committed: Tue Aug 23 09:42:31 2016 -0700

----------------------------------------------------------------------
 src/tests/slave_tests.cpp | 129 +++++++++++++++++++++++++++++++++++++++++
 1 file changed, 129 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/a064505e/src/tests/slave_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/slave_tests.cpp b/src/tests/slave_tests.cpp
index 30ca3da..dcf8454 100644
--- a/src/tests/slave_tests.cpp
+++ b/src/tests/slave_tests.cpp
@@ -263,6 +263,135 @@ TEST_F(SlaveTest, ShutdownUnregisteredExecutor)
 }
 
 
+// This test verifies that mesos agent gets notified of task
+// launch failure triggered by the executor register timeout
+// caused by slow URI fetching.
+TEST_F(SlaveTest, ExecutorTimeoutCausedBySlowFetch)
+{
+  Try<Owned<cluster::Master>> master = StartMaster();
+  ASSERT_SOME(master);
+
+  string hadoopPath = os::getcwd();
+  string hadoopBinPath = path::join(hadoopPath, "bin");
+
+  ASSERT_SOME(os::mkdir(hadoopBinPath));
+  ASSERT_SOME(os::chmod(hadoopBinPath, S_IRWXU | S_IRWXG | S_IRWXO));
+
+  // A spurious "hadoop" script that sleeps forever.
+  string mockHadoopScript = "#!/usr/bin/env bash\n"
+                            "sleep 1000";
+
+  string hadoopCommand = path::join(hadoopBinPath, "hadoop");
+  ASSERT_SOME(os::write(hadoopCommand, mockHadoopScript));
+  ASSERT_SOME(os::chmod(hadoopCommand,
+                        S_IRWXU | S_IRGRP | S_IXGRP | S_IROTH | S_IXOTH));
+
+  slave::Flags flags = CreateSlaveFlags();
+  flags.hadoop_home = hadoopPath;
+
+  Fetcher fetcher;
+
+  Try<MesosContainerizer*> _containerizer = MesosContainerizer::create(
+      flags, true, &fetcher);
+
+  CHECK_SOME(_containerizer);
+  Owned<MesosContainerizer> containerizer(_containerizer.get());
+
+  Owned<MasterDetector> detector = master.get()->createDetector();
+  Try<Owned<cluster::Slave>> slave = StartSlave(
+      detector.get(),
+      containerizer.get(),
+      flags);
+  ASSERT_SOME(slave);
+
+  MockScheduler sched;
+  MesosSchedulerDriver driver(
+      &sched,
+      DEFAULT_FRAMEWORK_INFO,
+      master.get()->pid,
+      DEFAULT_CREDENTIAL);
+
+  EXPECT_CALL(sched, registered(&driver, _, _))
+    .Times(1);
+
+  Future<vector<Offer>> offers;
+  EXPECT_CALL(sched, resourceOffers(&driver, _))
+    .WillOnce(FutureArg<1>(&offers))
+    .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+  driver.start();
+
+  AWAIT_READY(offers);
+  EXPECT_NE(0u, offers.get().size());
+
+  // Launch a task with the command executor.
+  // The task uses a URI that needs to be fetched by the HDFS client
+  // and will be blocked until the executor registrartion times out.
+  CommandInfo commandInfo;
+  CommandInfo::URI* uri = commandInfo.add_uris();
+  uri->set_value(path::join("hdfs://dummyhost/dummypath", "test"));
+
+  // Using a dummy command value as it's a required field. The
+  // command won't be invoked.
+  commandInfo.set_value("sleep 10");
+
+  ExecutorID executorId;
+  executorId.set_value("test-executor-staging");
+
+  TaskInfo task = createTask(
+      offers.get()[0].slave_id(),
+      offers.get()[0].resources(),
+      commandInfo,
+      executorId,
+      "test-task-staging");
+
+  Future<Nothing> fetch = FUTURE_DISPATCH(
+      _, &FetcherProcess::fetch);
+
+  Future<TaskStatus> status;
+  EXPECT_CALL(sched, statusUpdate(&driver, _))
+    .WillOnce(FutureArg<1>(&status));
+
+  Clock::pause();
+
+  driver.launchTasks(offers.get()[0].id(), {task});
+
+  Future<Nothing> executorLost;
+  EXPECT_CALL(sched, executorLost(&driver, executorId, _, _))
+    .WillOnce(FutureSatisfy(&executorLost));
+
+  // Ensure that the slave times out and kills the executor.
+  Future<Nothing> destroyExecutor = FUTURE_DISPATCH(
+      _, &MesosContainerizerProcess::destroy);
+
+  AWAIT_READY(fetch);
+
+  Clock::advance(flags.executor_registration_timeout);
+
+  AWAIT_READY(destroyExecutor);
+
+  Clock::settle(); // Wait for Containerizer::destroy to complete.
+
+  // Now advance time until the reaper reaps the executor.
+  while (status.isPending()) {
+    Clock::advance(process::MAX_REAP_INTERVAL());
+    Clock::settle();
+  }
+
+  AWAIT_READY(executorLost);
+
+  AWAIT_READY(status);
+  ASSERT_EQ(TASK_FAILED, status->state());
+  EXPECT_EQ(TaskStatus::SOURCE_SLAVE, status->source());
+  EXPECT_EQ(TaskStatus::REASON_CONTAINER_LAUNCH_FAILED, status->reason());
+
+  Clock::resume();
+
+  driver.stop();
+  driver.join();
+}
+
+
 // This test verifies that when an executor terminates before
 // registering with slave, it is properly cleaned up.
 TEST_F(SlaveTest, RemoveUnregisteredTerminatedExecutor)

Reply via email to