Added support to `max_completion_time` in default executor.

When a task group has multiple tasks:
- each task can have its own `max_completion_time`, or not have one;
- if a task succeeds before its `max_completion_time`, all other tasks
will keep running;
- if a task fails, all other tasks in the same group will fail (as
before);
- if a task does not succeed before its `max_completion_time`, it will
fail with `TASK_FAILED` and reason `REASON_MAX_COMPLETION_TIME_REACHED`,
while other tasks will be killed without above reason.

Review: https://reviews.apache.org/r/66291/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/a2ce5e45
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/a2ce5e45
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/a2ce5e45

Branch: refs/heads/master
Commit: a2ce5e459b89bc775a8de877d2050450938516ab
Parents: 7e11a2d
Author: Zhitao Li <zhitaoli...@gmail.com>
Authored: Thu May 3 08:34:58 2018 -0700
Committer: James Peach <jpe...@apache.org>
Committed: Thu May 3 08:34:58 2018 -0700

----------------------------------------------------------------------
 src/launcher/default_executor.cpp | 75 ++++++++++++++++++++++++++--------
 1 file changed, 58 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/a2ce5e45/src/launcher/default_executor.cpp
----------------------------------------------------------------------
diff --git a/src/launcher/default_executor.cpp 
b/src/launcher/default_executor.cpp
index ea0d425..76c6106 100644
--- a/src/launcher/default_executor.cpp
+++ b/src/launcher/default_executor.cpp
@@ -66,6 +66,7 @@ using process::Clock;
 using process::Failure;
 using process::Future;
 using process::Owned;
+using process::Timer;
 using process::UPID;
 
 using process::http::Connection;
@@ -129,6 +130,11 @@ private:
 
     // Set to true if the task group is in the process of being killed.
     bool killingTaskGroup;
+
+    // Set to true if the task has exceeded its max completion timeout.
+    bool killedByCompletionTimeout;
+
+    Option<Timer> maxCompletionTimer;
   };
 
 public:
@@ -608,6 +614,15 @@ protected:
         container->healthChecker = healthChecker.get();
       }
 
+      // Setup timer for max_completion_time.
+      if (task.max_completion_time().nanoseconds() > 0) {
+        Duration duration =
+          Nanoseconds(task.max_completion_time().nanoseconds());
+
+        container->maxCompletionTimer = delay(
+            duration, self(), &Self::maxCompletion, task.task_id(), duration);
+      }
+
       // Currently, the Mesos agent does not expose the mapping from
       // `ContainerID` to `TaskID` for nested containers.
       // In order for the Web UI to access the task sandbox, we create
@@ -873,7 +888,10 @@ protected:
         CHECK(WIFEXITED(status) || WIFSIGNALED(status))
           << "Unexpected wait status " << status;
 
-        if (container->killing) {
+        if (container->killedByCompletionTimeout) {
+          taskState = TASK_FAILED;
+          reason = TaskStatus::REASON_MAX_COMPLETION_TIME_REACHED;
+        } else if (container->killing) {
           // Send TASK_KILLED if the task was killed as a result of
           // `killTask()` or `shutdown()`.
           taskState = TASK_KILLED;
@@ -1065,7 +1083,7 @@ protected:
 
   Future<Nothing> kill(
       Container* container,
-      const Option<KillPolicy>& killPolicy = None())
+      const Option<Duration>& _gracePeriod = None())
   {
     if (!container->launched) {
       // We can get here if we're killing a task group for which multiple
@@ -1073,6 +1091,11 @@ protected:
       return Nothing();
     }
 
+    if (container->maxCompletionTimer.isSome()) {
+      Clock::cancel(container->maxCompletionTimer.get());
+      container->maxCompletionTimer = None();
+    }
+
     CHECK(!container->killing);
     container->killing = true;
 
@@ -1105,19 +1128,8 @@ protected:
     // Default grace period is set to 3s.
     Duration gracePeriod = Seconds(3);
 
-    Option<KillPolicy> taskInfoKillPolicy;
-    if (container->taskInfo.has_kill_policy()) {
-      taskInfoKillPolicy = container->taskInfo.kill_policy();
-    }
-
-    // Kill policy provided in the `Kill` event takes precedence
-    // over kill policy specified when the task was launched.
-    if (killPolicy.isSome() && killPolicy->has_grace_period()) {
-      gracePeriod = Nanoseconds(killPolicy->grace_period().nanoseconds());
-    } else if (taskInfoKillPolicy.isSome() &&
-               taskInfoKillPolicy->has_grace_period()) {
-      gracePeriod =
-        Nanoseconds(taskInfoKillPolicy->grace_period().nanoseconds());
+    if (_gracePeriod.isSome()) {
+      gracePeriod = _gracePeriod.get();
     }
 
     LOG(INFO) << "Scheduling escalation to SIGKILL in " << gracePeriod
@@ -1135,7 +1147,8 @@ protected:
     // Send a 'TASK_KILLING' update if the framework can handle it.
     CHECK_SOME(frameworkInfo);
 
-    if (protobuf::frameworkHasCapability(
+    if (!container->killedByCompletionTimeout &&
+        protobuf::frameworkHasCapability(
             frameworkInfo.get(),
             FrameworkInfo::Capability::TASK_KILLING_STATE)) {
       TaskStatus status = createTaskStatus(taskId, TASK_KILLING);
@@ -1248,8 +1261,20 @@ protected:
       return;
     }
 
+    Option<Duration> gracePeriod = None();
+
+    // Kill policy provided in the `Kill` event takes precedence
+    // over kill policy specified when the task was launched.
+    if (killPolicy.isSome() && killPolicy->has_grace_period()) {
+      gracePeriod = Nanoseconds(killPolicy->grace_period().nanoseconds());
+    } else if (container->taskInfo.has_kill_policy() &&
+               container->taskInfo.kill_policy().has_grace_period()) {
+      gracePeriod = Nanoseconds(
+          container->taskInfo.kill_policy().grace_period().nanoseconds());
+    }
+
     const ContainerID& containerId = container->containerId;
-    kill(container, killPolicy)
+    kill(container, gracePeriod)
       .onFailed(defer(self(), [=](const string& failure) {
         LOG(WARNING) << "Failed to kill the task '" << taskId
                      << "' running in child container " << containerId << ": "
@@ -1257,6 +1282,22 @@ protected:
       }));
   }
 
+  void maxCompletion(const TaskID& taskId, const Duration& duration)
+  {
+    if (!containers.contains(taskId)) {
+      return;
+    }
+
+    LOG(INFO) << "Killing task " << taskId
+              << " which exceeded its maximum completion time of " << duration;
+
+    Container* container = containers.at(taskId).get();
+    container->maxCompletionTimer = None();
+    container->killedByCompletionTimeout = true;
+    // Use a zero grace period to kill the container.
+    kill(container, Duration::zero());
+  }
+
   void taskCheckUpdated(
       const TaskID& taskId,
       const CheckStatusInfo& checkStatus)

Reply via email to