Added `max_completion_time` support to command executor. If `TaskInfo.max_completion_time` is set, command executor will kill the task with `SIGKILL` immediately. Note that no KillPolicy will be observed. Framework should only received a `TASK_FAILED` state with `REASON_MAX_COMPLETION_TIME_REACHED` reason.
Review: https://reviews.apache.org/r/66259/ Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/197d1ea3 Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/197d1ea3 Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/197d1ea3 Branch: refs/heads/master Commit: 197d1ea395e76961615e30f5b03550b1a4a4e779 Parents: 95bf46b Author: Zhitao Li <zhitaoli...@gmail.com> Authored: Thu May 3 08:33:32 2018 -0700 Committer: James Peach <jpe...@apache.org> Committed: Thu May 3 08:33:32 2018 -0700 ---------------------------------------------------------------------- src/launcher/executor.cpp | 64 ++++++++++++++++++++++++++++++++++++++---- 1 file changed, 59 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/197d1ea3/src/launcher/executor.cpp ---------------------------------------------------------------------- diff --git a/src/launcher/executor.cpp b/src/launcher/executor.cpp index 8d0869c..541ca5b 100644 --- a/src/launcher/executor.cpp +++ b/src/launcher/executor.cpp @@ -140,6 +140,7 @@ public: launched(false), killed(false), killedByHealthCheck(false), + killedByMaxCompletionTimer(false), terminated(false), pid(None()), shutdownGracePeriod(_shutdownGracePeriod), @@ -646,6 +647,21 @@ protected: launchEnvironment.add_variables()->CopyFrom(variable); } + // Setup timer for max_completion_time. + if (task.max_completion_time().nanoseconds() > 0) { + Duration duration = Nanoseconds(task.max_completion_time().nanoseconds()); + + LOG(INFO) << "Task " << taskId.get() << " has a max completion time of " + << duration; + + taskCompletionTimer = delay( + duration, + self(), + &Self::taskCompletionTimeout, + task.task_id(), + duration); + } + LOG(INFO) << "Starting task " << taskId.get(); pid = launchTaskSubprocess( @@ -734,6 +750,12 @@ protected: void kill(const TaskID& _taskId, const Option<KillPolicy>& override = None()) { + // Cancel the taskCompletionTimer if it is set and ongoing. + if (taskCompletionTimer.isSome()) { + Clock::cancel(taskCompletionTimer.get()); + taskCompletionTimer = None(); + } + // Default grace period is set to 3s for backwards compatibility. // // TODO(alexr): Replace it with a more meaningful default, e.g. @@ -846,10 +868,13 @@ private: CHECK_SOME(taskId); CHECK(taskId.get() == _taskId); - if (protobuf::frameworkHasCapability( + if (!killedByMaxCompletionTimer && + protobuf::frameworkHasCapability( frameworkInfo.get(), FrameworkInfo::Capability::TASK_KILLING_STATE)) { - TaskStatus status = createTaskStatus(taskId.get(), TASK_KILLING); + TaskStatus status = + createTaskStatus(taskId.get(), TASK_KILLING); + forward(status); } @@ -915,6 +940,13 @@ private: Clock::cancel(killGracePeriodTimer.get()); } + if (taskCompletionTimer.isSome()) { + Clock::cancel(taskCompletionTimer.get()); + taskCompletionTimer = None(); + } + + Option<TaskStatus::Reason> reason = None(); + if (!status_.isReady()) { taskState = TASK_FAILED; message = @@ -928,7 +960,10 @@ private: CHECK(WIFEXITED(status) || WIFSIGNALED(status)) << "Unexpected wait status " << status; - if (killed) { + if (killedByMaxCompletionTimer) { + taskState = TASK_FAILED; + reason = TaskStatus::REASON_MAX_COMPLETION_TIME_REACHED; + } else if (killed) { // Send TASK_KILLED if the task was killed as a result of // kill() or shutdown(). taskState = TASK_KILLED; @@ -948,7 +983,7 @@ private: TaskStatus status = createTaskStatus( taskId.get(), taskState, - None(), + reason, message); // Indicate that a kill occurred due to a failing health check. @@ -1007,6 +1042,23 @@ private: } } + + void taskCompletionTimeout(const TaskID& taskId, const Duration& duration) + { + CHECK(!terminated); + CHECK(!killed); + + LOG(INFO) << "Killing task " << taskId + << " which exceeded its maximum completion time of " << duration; + + taskCompletionTimer = None(); + killedByMaxCompletionTimer = true; + + // Use a zero gracePeriod to kill the task. + kill(taskId, Duration::zero()); + } + + // Use this helper to create a status update from scratch, i.e., without // previously attached extra information like `data` or `check_status`. TaskStatus createTaskStatus( @@ -1130,11 +1182,13 @@ private: bool launched; bool killed; bool killedByHealthCheck; + bool killedByMaxCompletionTimer; + bool terminated; Option<Time> killGracePeriodStart; Option<Timer> killGracePeriodTimer; - + Option<Timer> taskCompletionTimer; Option<pid_t> pid; Duration shutdownGracePeriod; Option<KillPolicy> killPolicy;