Added support of `max_completion_time` in docker executor.

If `TaskInfo.max_completion_time` is set, docker executor will kill
the task immediately. We reuse the `shutdown` method to achieve a
forced kill ignoring any `KillPolicy`.

Framework should only received a `TASK_FAILED` state with
`REASON_MAX_COMPLETION_TIME_REACHED` reason.

Review: https://reviews.apache.org/r/66283/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/d86a7fed
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/d86a7fed
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/d86a7fed

Branch: refs/heads/master
Commit: d86a7fed949ebc4c51a89c4fa7adef91ac5de9dc
Parents: 0278ac9
Author: Zhitao Li <zhitaoli...@gmail.com>
Authored: Thu May 3 08:34:44 2018 -0700
Committer: James Peach <jpe...@apache.org>
Committed: Thu May 3 08:34:44 2018 -0700

----------------------------------------------------------------------
 src/docker/executor.cpp | 68 +++++++++++++++++++++++++++++++++++++++++---
 1 file changed, 64 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/d86a7fed/src/docker/executor.cpp
----------------------------------------------------------------------
diff --git a/src/docker/executor.cpp b/src/docker/executor.cpp
index 1d67211..16509da 100644
--- a/src/docker/executor.cpp
+++ b/src/docker/executor.cpp
@@ -99,6 +99,7 @@ public:
       killed(false),
       terminated(false),
       killedByHealthCheck(false),
+      killedByTaskCompletionTimeout(false),
       launcherDir(launcherDir),
       docker(docker),
       containerName(containerName),
@@ -159,6 +160,22 @@ public:
       killPolicy = task.kill_policy();
     }
 
+    // Setup timer for max_completion_time.
+    if (task.max_completion_time().nanoseconds() > 0) {
+      Duration duration = 
Nanoseconds(task.max_completion_time().nanoseconds());
+
+      LOG(INFO) << "Task " << taskId.get() << " has a max completion time of "
+                << duration;
+
+      taskCompletionTimer = delay(
+          duration,
+          self(),
+          &Self::taskCompletionTimeout,
+          driver,
+          task.task_id(),
+          duration);
+    }
+
     LOG(INFO) << "Starting task " << taskId.get();
 
     // Send initial TASK_STARTING update.
@@ -387,6 +404,29 @@ public:
     killTask(driver, taskId, gracePeriod);
   }
 
+  void taskCompletionTimeout(
+      ExecutorDriver* driver, const TaskID& taskId, const Duration& duration)
+  {
+    if (killed) {
+      return;
+    }
+
+    if (terminated) {
+      return;
+    }
+
+    LOG(INFO) << "Killing task " << taskId
+              << " which exceeded its maximum completion time of " << duration;
+
+    taskCompletionTimer = None();
+    killedByTaskCompletionTimeout = true;
+    killed = true;
+
+    // Use a zero grace period to kill the task, in order to ignore the
+    // `KillPolicy`.
+    killTask(driver, taskId, Duration::zero());
+  }
+
   void frameworkMessage(ExecutorDriver* driver, const string& data) {}
 
   void shutdown(ExecutorDriver* driver)
@@ -467,6 +507,12 @@ private:
       return;
     }
 
+    // Cancel the taskCompletionTimer if it is set and ongoing.
+    if (taskCompletionTimer.isSome()) {
+      Clock::cancel(taskCompletionTimer.get());
+      taskCompletionTimer = None();
+    }
+
     // Terminate if a kill task request is received before the task is 
launched.
     // This can happen, for example, if `RunTaskMessage` has not been 
delivered.
     // See MESOS-8297.
@@ -522,8 +568,10 @@ private:
       if (!killed) {
         killed = true;
 
-        // Send TASK_KILLING if the framework can handle it.
-        if (protobuf::frameworkHasCapability(
+        // Send TASK_KILLING if task is not killed by completion timeout and
+        // the framework can handle it.
+        if (!killedByTaskCompletionTimeout &&
+            protobuf::frameworkHasCapability(
                 frameworkInfo.get(),
                 FrameworkInfo::Capability::TASK_KILLING_STATE)) {
           // TODO(alexr): Use `protobuf::createTaskStatus()`
@@ -531,6 +579,7 @@ private:
           TaskStatus status;
           status.mutable_task_id()->CopyFrom(taskId.get());
           status.set_state(TASK_KILLING);
+
           driver.get()->sendStatusUpdate(status);
         }
 
@@ -639,6 +688,7 @@ private:
   void _reaped(const Future<Option<int>>& run)
   {
     TaskState state;
+    Option<TaskStatus::Reason> reason = None();
     string message;
 
     if (!run.isReady()) {
@@ -654,7 +704,10 @@ private:
       CHECK(WIFEXITED(status) || WIFSIGNALED(status))
         << "Unexpected wait status " << status;
 
-      if (killed) {
+      if (killedByTaskCompletionTimeout) {
+        state = TASK_FAILED;
+        reason = TaskStatus::REASON_MAX_COMPLETION_TIME_REACHED;
+      } else if (killed) {
         // Send TASK_KILLED if the task was killed as a result of
         // kill() or shutdown(). Note that in general there is a
         // race between signaling the container and it terminating
@@ -683,12 +736,17 @@ private:
     taskStatus.mutable_task_id()->CopyFrom(taskId.get());
     taskStatus.set_state(state);
     taskStatus.set_message(message);
+
     if (killed && killedByHealthCheck) {
       // TODO(abudnik): Consider specifying appropriate status update reason,
       // saying that the task was killed due to a failing health check.
       taskStatus.set_healthy(false);
     }
 
+    if (reason.isSome()) {
+      taskStatus.set_reason(reason.get());
+    }
+
     CHECK_SOME(driver);
     driver.get()->sendStatusUpdate(taskStatus);
 
@@ -777,8 +835,10 @@ private:
   // see MESOS-5252.
   bool killed;
   bool terminated;
-
   bool killedByHealthCheck;
+  bool killedByTaskCompletionTimeout;
+
+  Option<Timer> taskCompletionTimer;
 
   string launcherDir;
   Owned<Docker> docker;

Reply via email to