Added support of `max_completion_time` in docker executor. If `TaskInfo.max_completion_time` is set, docker executor will kill the task immediately. We reuse the `shutdown` method to achieve a forced kill ignoring any `KillPolicy`.
Framework should only received a `TASK_FAILED` state with `REASON_MAX_COMPLETION_TIME_REACHED` reason. Review: https://reviews.apache.org/r/66283/ Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/d86a7fed Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/d86a7fed Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/d86a7fed Branch: refs/heads/master Commit: d86a7fed949ebc4c51a89c4fa7adef91ac5de9dc Parents: 0278ac9 Author: Zhitao Li <zhitaoli...@gmail.com> Authored: Thu May 3 08:34:44 2018 -0700 Committer: James Peach <jpe...@apache.org> Committed: Thu May 3 08:34:44 2018 -0700 ---------------------------------------------------------------------- src/docker/executor.cpp | 68 +++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 64 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/d86a7fed/src/docker/executor.cpp ---------------------------------------------------------------------- diff --git a/src/docker/executor.cpp b/src/docker/executor.cpp index 1d67211..16509da 100644 --- a/src/docker/executor.cpp +++ b/src/docker/executor.cpp @@ -99,6 +99,7 @@ public: killed(false), terminated(false), killedByHealthCheck(false), + killedByTaskCompletionTimeout(false), launcherDir(launcherDir), docker(docker), containerName(containerName), @@ -159,6 +160,22 @@ public: killPolicy = task.kill_policy(); } + // Setup timer for max_completion_time. + if (task.max_completion_time().nanoseconds() > 0) { + Duration duration = Nanoseconds(task.max_completion_time().nanoseconds()); + + LOG(INFO) << "Task " << taskId.get() << " has a max completion time of " + << duration; + + taskCompletionTimer = delay( + duration, + self(), + &Self::taskCompletionTimeout, + driver, + task.task_id(), + duration); + } + LOG(INFO) << "Starting task " << taskId.get(); // Send initial TASK_STARTING update. @@ -387,6 +404,29 @@ public: killTask(driver, taskId, gracePeriod); } + void taskCompletionTimeout( + ExecutorDriver* driver, const TaskID& taskId, const Duration& duration) + { + if (killed) { + return; + } + + if (terminated) { + return; + } + + LOG(INFO) << "Killing task " << taskId + << " which exceeded its maximum completion time of " << duration; + + taskCompletionTimer = None(); + killedByTaskCompletionTimeout = true; + killed = true; + + // Use a zero grace period to kill the task, in order to ignore the + // `KillPolicy`. + killTask(driver, taskId, Duration::zero()); + } + void frameworkMessage(ExecutorDriver* driver, const string& data) {} void shutdown(ExecutorDriver* driver) @@ -467,6 +507,12 @@ private: return; } + // Cancel the taskCompletionTimer if it is set and ongoing. + if (taskCompletionTimer.isSome()) { + Clock::cancel(taskCompletionTimer.get()); + taskCompletionTimer = None(); + } + // Terminate if a kill task request is received before the task is launched. // This can happen, for example, if `RunTaskMessage` has not been delivered. // See MESOS-8297. @@ -522,8 +568,10 @@ private: if (!killed) { killed = true; - // Send TASK_KILLING if the framework can handle it. - if (protobuf::frameworkHasCapability( + // Send TASK_KILLING if task is not killed by completion timeout and + // the framework can handle it. + if (!killedByTaskCompletionTimeout && + protobuf::frameworkHasCapability( frameworkInfo.get(), FrameworkInfo::Capability::TASK_KILLING_STATE)) { // TODO(alexr): Use `protobuf::createTaskStatus()` @@ -531,6 +579,7 @@ private: TaskStatus status; status.mutable_task_id()->CopyFrom(taskId.get()); status.set_state(TASK_KILLING); + driver.get()->sendStatusUpdate(status); } @@ -639,6 +688,7 @@ private: void _reaped(const Future<Option<int>>& run) { TaskState state; + Option<TaskStatus::Reason> reason = None(); string message; if (!run.isReady()) { @@ -654,7 +704,10 @@ private: CHECK(WIFEXITED(status) || WIFSIGNALED(status)) << "Unexpected wait status " << status; - if (killed) { + if (killedByTaskCompletionTimeout) { + state = TASK_FAILED; + reason = TaskStatus::REASON_MAX_COMPLETION_TIME_REACHED; + } else if (killed) { // Send TASK_KILLED if the task was killed as a result of // kill() or shutdown(). Note that in general there is a // race between signaling the container and it terminating @@ -683,12 +736,17 @@ private: taskStatus.mutable_task_id()->CopyFrom(taskId.get()); taskStatus.set_state(state); taskStatus.set_message(message); + if (killed && killedByHealthCheck) { // TODO(abudnik): Consider specifying appropriate status update reason, // saying that the task was killed due to a failing health check. taskStatus.set_healthy(false); } + if (reason.isSome()) { + taskStatus.set_reason(reason.get()); + } + CHECK_SOME(driver); driver.get()->sendStatusUpdate(taskStatus); @@ -777,8 +835,10 @@ private: // see MESOS-5252. bool killed; bool terminated; - bool killedByHealthCheck; + bool killedByTaskCompletionTimeout; + + Option<Timer> taskCompletionTimer; string launcherDir; Owned<Docker> docker;