Repository: mesos Updated Branches: refs/heads/master a6d6df552 -> afe886c1d
Refactored `Checker` to allow sharing code with `HealthChecker`. Move `CheckerProcess` to its own files and changed the signature of the callback. `Checker` now wraps a `CheckerProcess` and gets check status updates via a callback. It should be able to easily refactor `HealthChecker` to wrap the same actor. Review: https://reviews.apache.org/r/59872/ Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/00bd7df7 Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/00bd7df7 Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/00bd7df7 Branch: refs/heads/master Commit: 00bd7df78641db9c2b7673a8e39535b221369834 Parents: bda488d Author: Gastón Kleiman <gas...@mesosphere.io> Authored: Fri Jun 9 14:59:56 2017 +0200 Committer: Alexander Rukletsov <al...@apache.org> Committed: Fri Jun 9 15:43:56 2017 +0200 ---------------------------------------------------------------------- src/CMakeLists.txt | 1 + src/Makefile.am | 2 + src/checks/checker.cpp | 1177 +++-------------------------------- src/checks/checker.hpp | 27 +- src/checks/checker_process.cpp | 1021 ++++++++++++++++++++++++++++++ src/checks/checker_process.hpp | 148 +++++ 6 files changed, 1282 insertions(+), 1094 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/00bd7df7/src/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index d8a78f4..43982f3 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -276,6 +276,7 @@ set(HDFS_SRC set(HEALTH_CHECK_SRC checks/checker.cpp + checks/checker_process.cpp checks/health_checker.cpp ) http://git-wip-us.apache.org/repos/asf/mesos/blob/00bd7df7/src/Makefile.am ---------------------------------------------------------------------- diff --git a/src/Makefile.am b/src/Makefile.am index ea76312..e9c8b87 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -892,6 +892,7 @@ libmesos_no_3rdparty_la_SOURCES += \ authorizer/authorizer.cpp \ authorizer/local/authorizer.cpp \ checks/checker.cpp \ + checks/checker_process.cpp \ checks/health_checker.cpp \ common/attributes.cpp \ common/command_utils.cpp \ @@ -1023,6 +1024,7 @@ libmesos_no_3rdparty_la_SOURCES += \ authentication/cram_md5/auxprop.hpp \ authorizer/local/authorizer.hpp \ checks/checker.hpp \ + checks/checker_process.hpp \ checks/health_checker.hpp \ common/build.hpp \ common/command_utils.hpp \ http://git-wip-us.apache.org/repos/asf/mesos/blob/00bd7df7/src/checks/checker.cpp ---------------------------------------------------------------------- diff --git a/src/checks/checker.cpp b/src/checks/checker.cpp index dcc3164..58d02a3 100644 --- a/src/checks/checker.cpp +++ b/src/checks/checker.cpp @@ -17,11 +17,7 @@ #include "checks/checker.hpp" #include <cstdint> -#include <iterator> -#include <map> -#include <memory> #include <string> -#include <tuple> #include <vector> #include <glog/logging.h> @@ -29,206 +25,62 @@ #include <mesos/mesos.hpp> #include <mesos/type_utils.hpp> -#include <mesos/agent/agent.hpp> - -#include <process/collect.hpp> -#include <process/defer.hpp> -#include <process/delay.hpp> #include <process/future.hpp> -#include <process/io.hpp> -#include <process/protobuf.hpp> -#include <process/subprocess.hpp> -#include <process/time.hpp> -#include <stout/check.hpp> -#include <stout/duration.hpp> #include <stout/exit.hpp> -#include <stout/foreach.hpp> -#include <stout/jsonify.hpp> -#include <stout/nothing.hpp> #include <stout/option.hpp> #include <stout/protobuf.hpp> #include <stout/stopwatch.hpp> #include <stout/strings.hpp> #include <stout/try.hpp> -#include <stout/unreachable.hpp> #include <stout/uuid.hpp> -#include <stout/os/environment.hpp> -#include <stout/os/killtree.hpp> +#include "checks/checker_process.hpp" #include "common/http.hpp" -#include "common/protobuf_utils.hpp" #include "common/status_utils.hpp" #include "common/validation.hpp" -#include "internal/evolve.hpp" - -#ifdef __linux__ -#include "linux/ns.hpp" -#endif - namespace http = process::http; -using process::Failure; using process::Future; using process::Owned; -using process::Promise; -using process::Subprocess; -using std::map; -using std::shared_ptr; using std::string; -using std::tuple; using std::vector; namespace mesos { namespace internal { namespace checks { -#ifndef __WINDOWS__ -constexpr char HTTP_CHECK_COMMAND[] = "curl"; -constexpr char TCP_CHECK_COMMAND[] = "mesos-tcp-connect"; -#else -constexpr char HTTP_CHECK_COMMAND[] = "curl.exe"; -constexpr char TCP_CHECK_COMMAND[] = "mesos-tcp-connect.exe"; -#endif // __WINDOWS__ -constexpr char DEFAULT_HTTP_SCHEME[] = "http"; +// Creates a valid instance of `CheckStatusInfo` with the `type` set in +// accordance to the associated `CheckInfo`. +static CheckStatusInfo createEmptyCheckStatusInfo(const CheckInfo& checkInfo) { + CheckStatusInfo checkStatusInfo; + checkStatusInfo.set_type(checkInfo.type()); -// Use '127.0.0.1' instead of 'localhost', because the host -// file in some container images may not contain 'localhost'. -constexpr char DEFAULT_DOMAIN[] = "127.0.0.1"; - - -#ifdef __linux__ -// TODO(alexr): Instead of defining this ad-hoc clone function, provide a -// general solution for entring namespace in child processes, see MESOS-6184. -static pid_t cloneWithSetns( - const lambda::function<int()>& func, - const Option<pid_t>& taskPid, - const vector<string>& namespaces) -{ - return process::defaultClone([=]() -> int { - if (taskPid.isSome()) { - foreach (const string& ns, namespaces) { - Try<Nothing> setns = ns::setns(taskPid.get(), ns); - if (setns.isError()) { - // This effectively aborts the check. - LOG(FATAL) << "Failed to enter the " << ns << " namespace of task" - << " (pid: " << taskPid.get() << "): " << setns.error(); - } - - VLOG(1) << "Entered the " << ns << " namespace of task" - << " (pid: " << taskPid.get() << ") successfully"; - } + switch (checkInfo.type()) { + case CheckInfo::COMMAND: { + checkStatusInfo.mutable_command(); + break; + } + case CheckInfo::HTTP: { + checkStatusInfo.mutable_http(); + break; } + case CheckInfo::TCP: { + checkStatusInfo.mutable_tcp(); + break; + } + case CheckInfo::UNKNOWN: { + LOG(FATAL) << "Received UNKNOWN check type"; + break; + } + } - return func(); - }); + return checkStatusInfo; } -#endif - - -class CheckerProcess : public ProtobufProcess<CheckerProcess> -{ -public: - CheckerProcess( - const CheckInfo& _check, - const string& _launcherDir, - const lambda::function<void(const CheckStatusInfo&)>& _callback, - const TaskID& _taskId, - const Option<pid_t>& _taskPid, - const vector<string>& _namespaces, - const Option<ContainerID>& _taskContainerId, - const Option<http::URL>& _agentURL, - const Option<string>& _authorizationHeader, - bool _commandCheckViaAgent); - - void pause(); - void resume(); - - virtual ~CheckerProcess() {} - -protected: - void initialize() override; - void finalize() override; - -private: - void performCheck(); - void scheduleNext(const Duration& duration); - void processCheckResult( - const Stopwatch& stopwatch, - const Option<CheckStatusInfo>& result); - - Future<int> commandCheck(); - - Future<int> nestedCommandCheck(); - void _nestedCommandCheck(shared_ptr<Promise<int>> promise); - void __nestedCommandCheck( - shared_ptr<Promise<int>> promise, - http::Connection connection); - void ___nestedCommandCheck( - shared_ptr<Promise<int>> promise, - const ContainerID& checkContainerId, - const http::Response& launchResponse); - - void nestedCommandCheckFailure( - shared_ptr<Promise<int>> promise, - http::Connection connection, - ContainerID checkContainerId, - shared_ptr<bool> checkTimedOut, - const string& failure); - - Future<Option<int>> waitNestedContainer(const ContainerID& containerId); - Future<Option<int>> _waitNestedContainer( - const ContainerID& containerId, - const http::Response& httpResponse); - - void processCommandCheckResult( - const Stopwatch& stopwatch, - const Future<int>& future); - - Future<int> httpCheck(); - Future<int> _httpCheck( - const tuple<Future<Option<int>>, Future<string>, Future<string>>& t); - void processHttpCheckResult( - const Stopwatch& stopwatch, - const Future<int>& future); - - Future<bool> tcpCheck(); - Future<bool> _tcpCheck( - const tuple<Future<Option<int>>, Future<string>, Future<string>>& t); - void processTcpCheckResult( - const Stopwatch& stopwatch, - const Future<bool>& future); - - const CheckInfo check; - Duration checkDelay; - Duration checkInterval; - Duration checkTimeout; - - // Contains the binary for TCP checks. - const string launcherDir; - - const lambda::function<void(const CheckStatusInfo&)> updateCallback; - const TaskID taskId; - const Option<pid_t> taskPid; - const vector<string> namespaces; - const Option<ContainerID> taskContainerId; - const Option<http::URL> agentURL; - const Option<std::string> authorizationHeader; - const bool commandCheckViaAgent; - - Option<lambda::function<pid_t(const lambda::function<int()>&)>> clone; - - CheckStatusInfo previousCheckStatus; - bool paused; - - // Contains the ID of the most recently terminated nested container - // that was used to perform a COMMAND check. - Option<ContainerID> previousCheckContainerId; -}; Try<Owned<Checker>> Checker::create( @@ -245,19 +97,18 @@ Try<Owned<Checker>> Checker::create( return error.get(); } - Owned<CheckerProcess> process(new CheckerProcess( - check, - launcherDir, - callback, - taskId, - taskPid, - namespaces, - None(), - None(), - None(), - false)); - - return Owned<Checker>(new Checker(process)); + return Owned<Checker>( + new Checker( + check, + launcherDir, + callback, + taskId, + taskPid, + namespaces, + None(), + None(), + None(), + false)); } @@ -276,49 +127,22 @@ Try<Owned<Checker>> Checker::create( return error.get(); } - Owned<CheckerProcess> process(new CheckerProcess( - check, - launcherDir, - callback, - taskId, - None(), - {}, - taskContainerId, - agentURL, - authorizationHeader, - true)); - - return Owned<Checker>(new Checker(process)); + return Owned<Checker>( + new Checker( + check, + launcherDir, + callback, + taskId, + None(), + {}, + taskContainerId, + agentURL, + authorizationHeader, + true)); } -Checker::Checker(Owned<CheckerProcess> _process) - : process(_process) -{ - spawn(CHECK_NOTNULL(process.get())); -} - - -Checker::~Checker() -{ - terminate(process.get()); - wait(process.get()); -} - - -void Checker::pause() -{ - dispatch(process.get(), &CheckerProcess::pause); -} - - -void Checker::resume() -{ - dispatch(process.get(), &CheckerProcess::resume); -} - - -CheckerProcess::CheckerProcess( +Checker::Checker( const CheckInfo& _check, const string& _launcherDir, const lambda::function<void(const CheckStatusInfo&)>& _callback, @@ -327,902 +151,77 @@ CheckerProcess::CheckerProcess( const vector<string>& _namespaces, const Option<ContainerID>& _taskContainerId, const Option<http::URL>& _agentURL, - const Option<std::string>& _authorizationHeader, + const Option<string>& _authorizationHeader, bool _commandCheckViaAgent) - : ProcessBase(process::ID::generate("checker")), - check(_check), - launcherDir(_launcherDir), - updateCallback(_callback), + : check(_check), + callback(_callback), + name(CheckInfo::Type_Name(check.type()) + " check"), taskId(_taskId), - taskPid(_taskPid), - namespaces(_namespaces), - taskContainerId(_taskContainerId), - agentURL(_agentURL), - authorizationHeader(_authorizationHeader), - commandCheckViaAgent(_commandCheckViaAgent), - paused(false) -{ - Try<Duration> create = Duration::create(check.delay_seconds()); - CHECK_SOME(create); - checkDelay = create.get(); - - create = Duration::create(check.interval_seconds()); - CHECK_SOME(create); - checkInterval = create.get(); - - // Zero value means infinite timeout. - create = Duration::create(check.timeout_seconds()); - CHECK_SOME(create); - checkTimeout = - (create.get() > Duration::zero()) ? create.get() : Duration::max(); - - // The first check update should be sent only when a check succeeds, - // hence we should deduplicate against a corresponding "empty" result. - previousCheckStatus.set_type(check.type()); - switch (check.type()) { - case CheckInfo::COMMAND: { - previousCheckStatus.mutable_command(); - break; - } - case CheckInfo::HTTP: { - previousCheckStatus.mutable_http(); - break; - } - case CheckInfo::TCP: { - previousCheckStatus.mutable_tcp(); - break; - } - case CheckInfo::UNKNOWN: { - LOG(FATAL) << "Received UNKNOWN check type"; - break; - } - } - -#ifdef __linux__ - if (!namespaces.empty()) { - clone = lambda::bind(&cloneWithSetns, lambda::_1, taskPid, namespaces); - } -#endif -} - - -void CheckerProcess::initialize() + previousCheckStatus(createEmptyCheckStatusInfo(_check)) { VLOG(1) << "Check configuration for task '" << taskId << "':" << " '" << jsonify(JSON::Protobuf(check)) << "'"; - scheduleNext(checkDelay); -} - - -void CheckerProcess::finalize() -{ - LOG(INFO) << "Checking for task '" << taskId << "' stopped"; -} - - -void CheckerProcess::performCheck() -{ - if (paused) { - return; - } - - Stopwatch stopwatch; - stopwatch.start(); - - switch (check.type()) { - case CheckInfo::COMMAND: { - Future<int> future = commandCheckViaAgent ? nestedCommandCheck() - : commandCheck(); - future.onAny(defer( - self(), - &Self::processCommandCheckResult, stopwatch, lambda::_1)); - break; - } - case CheckInfo::HTTP: { - httpCheck().onAny(defer( - self(), - &Self::processHttpCheckResult, stopwatch, lambda::_1)); - break; - } - case CheckInfo::TCP: { - tcpCheck().onAny(defer( - self(), - &Self::processTcpCheckResult, stopwatch, lambda::_1)); - break; - } - case CheckInfo::UNKNOWN: { - LOG(FATAL) << "Received UNKNOWN check type"; - break; - } - } -} - - -void CheckerProcess::scheduleNext(const Duration& duration) -{ - CHECK(!paused); - - VLOG(1) << "Scheduling check for task '" << taskId << "' in " << duration; - - delay(duration, self(), &Self::performCheck); -} - - -void CheckerProcess::pause() -{ - if (!paused) { - VLOG(1) << "Checking for task '" << taskId << "' paused"; - - paused = true; - } -} - - -void CheckerProcess::resume() -{ - if (paused) { - VLOG(1) << "Checking for task '" << taskId << "' resumed"; - - paused = false; - - // Schedule a check immediately. - scheduleNext(Duration::zero()); - } -} - -void CheckerProcess::processCheckResult( - const Stopwatch& stopwatch, - const Option<CheckStatusInfo>& result) -{ - // `Checker` might have been paused while performing the check. - if (paused) { - LOG(INFO) << "Ignoring " << check.type() << " check result for" - << " task '" << taskId << "': checking is paused"; - return; - } - - // `result` should be some if it was possible to perform the check, - // and empty if there was a transient error. - if (result.isSome()) { - VLOG(1) << "Performed " << check.type() << " check" - << " for task '" << taskId << "' in " << stopwatch.elapsed(); - - // Trigger the callback if check info changes. - if (result.get() != previousCheckStatus) { - // We assume this is a local send, i.e., the checker library is not used - // in a binary external to the executor and hence can not exit before - // the data is sent to the executor. - updateCallback(result.get()); - previousCheckStatus = result.get(); - } - } + process.reset( + new CheckerProcess( + _check, + _launcherDir, + std::bind(&Checker::processCheckResult, this, lambda::_1), + _taskId, + _taskPid, + _namespaces, + _taskContainerId, + _agentURL, + _authorizationHeader, + _commandCheckViaAgent)); - scheduleNext(checkInterval); + spawn(process.get()); } -Future<int> CheckerProcess::commandCheck() -{ - CHECK_EQ(CheckInfo::COMMAND, check.type()); - CHECK(check.has_command()); - - const CommandInfo& command = check.command().command(); - - map<string, string> environment = os::environment(); - - foreach (const Environment::Variable& variable, - command.environment().variables()) { - environment[variable.name()] = variable.value(); - } - - // Launch the subprocess. - Try<Subprocess> s = Error("Not launched"); - - if (command.shell()) { - // Use the shell variant. - VLOG(1) << "Launching COMMAND check '" << command.value() << "'" - << " for task '" << taskId << "'"; - - s = process::subprocess( - command.value(), - Subprocess::PATH(os::DEV_NULL), - Subprocess::FD(STDERR_FILENO), - Subprocess::FD(STDERR_FILENO), - environment, - clone); - } else { - // Use the exec variant. - vector<string> argv( - std::begin(command.arguments()), std::end(command.arguments())); - - VLOG(1) << "Launching COMMAND check [" << command.value() << ", " - << strings::join(", ", argv) << "] for task '" << taskId << "'"; - - s = process::subprocess( - command.value(), - argv, - Subprocess::PATH(os::DEV_NULL), - Subprocess::FD(STDERR_FILENO), - Subprocess::FD(STDERR_FILENO), - nullptr, - environment, - clone); - } - - if (s.isError()) { - return Failure("Failed to create subprocess: " + s.error()); - } - - // TODO(alexr): Use lambda named captures for - // these cached values once it is available. - const pid_t commandPid = s->pid(); - const Duration timeout = checkTimeout; - const TaskID _taskId = taskId; - - return s->status() - .after( - timeout, - [timeout, commandPid, _taskId](Future<Option<int>> future) { - future.discard(); - - if (commandPid != -1) { - // Cleanup the external command process. - VLOG(1) << "Killing the COMMAND check process '" << commandPid - << "' for task '" << _taskId << "'"; - - os::killtree(commandPid, SIGKILL); - } - - return Failure("Command timed out after " + stringify(timeout)); - }) - .then([](const Option<int>& exitCode) -> Future<int> { - if (exitCode.isNone()) { - return Failure("Failed to reap the command process"); - } - - return exitCode.get(); - }); -} - - -Future<int> CheckerProcess::nestedCommandCheck() -{ - CHECK_EQ(CheckInfo::COMMAND, check.type()); - CHECK(check.has_command()); - CHECK_SOME(taskContainerId); - CHECK_SOME(agentURL); - - VLOG(1) << "Launching COMMAND check for task '" << taskId << "'"; - - // We don't want recoverable errors, e.g., the agent responding with - // HTTP status code 503, to trigger a check failure. - // - // The future returned by this method represents the result of a - // check. It will be set to the exit status of the check command if it - // succeeded, to a `Failure` if there was a non-transient error, and - // discarded if there was a transient error. - auto promise = std::make_shared<Promise<int>>(); - - if (previousCheckContainerId.isSome()) { - agent::Call call; - call.set_type(agent::Call::REMOVE_NESTED_CONTAINER); - - agent::Call::RemoveNestedContainer* removeContainer = - call.mutable_remove_nested_container(); - - removeContainer->mutable_container_id()->CopyFrom( - previousCheckContainerId.get()); - - http::Request request; - request.method = "POST"; - request.url = agentURL.get(); - request.body = serialize(ContentType::PROTOBUF, evolve(call)); - request.headers = {{"Accept", stringify(ContentType::PROTOBUF)}, - {"Content-Type", stringify(ContentType::PROTOBUF)}}; - - if (authorizationHeader.isSome()) { - request.headers["Authorization"] = authorizationHeader.get(); - } - - http::request(request, false) - .onFailed(defer(self(), - [this, promise](const string& failure) { - LOG(WARNING) << "Connection to remove the nested container '" - << previousCheckContainerId.get() - << "' used for the COMMAND check for task '" - << taskId << "' failed: " << failure; - - // Something went wrong while sending the request, we treat this - // as a transient failure and discard the promise. - promise->discard(); - })) - .onReady(defer(self(), [this, promise](const http::Response& response) { - if (response.code != http::Status::OK) { - // The agent was unable to remove the check container, we - // treat this as a transient failure and discard the promise. - LOG(WARNING) << "Received '" << response.status << "' (" - << response.body << ") while removing the nested" - << " container '" << previousCheckContainerId.get() - << "' used for the COMMAND check for task '" - << taskId << "'"; - - promise->discard(); - } - - previousCheckContainerId = None(); - _nestedCommandCheck(promise); - })); - } else { - _nestedCommandCheck(promise); - } - - return promise->future(); -} - - -void CheckerProcess::_nestedCommandCheck(shared_ptr<Promise<int>> promise) -{ - // TODO(alexr): Use a lambda named capture for - // this cached value once it is available. - const TaskID _taskId = taskId; - - http::connect(agentURL.get()) - .onFailed(defer(self(), [_taskId, promise](const string& failure) { - LOG(WARNING) << "Unable to establish connection with the agent to launch" - << " COMMAND check for task '" << _taskId << "'" - << ": " << failure; - - // We treat this as a transient failure. - promise->discard(); - })) - .onReady(defer(self(), &Self::__nestedCommandCheck, promise, lambda::_1)); -} - - -void CheckerProcess::__nestedCommandCheck( - shared_ptr<Promise<int>> promise, - http::Connection connection) -{ - ContainerID checkContainerId; - checkContainerId.set_value("check-" + UUID::random().toString()); - checkContainerId.mutable_parent()->CopyFrom(taskContainerId.get()); - - previousCheckContainerId = checkContainerId; - - CommandInfo command(check.command().command()); - - agent::Call call; - call.set_type(agent::Call::LAUNCH_NESTED_CONTAINER_SESSION); - - agent::Call::LaunchNestedContainerSession* launch = - call.mutable_launch_nested_container_session(); - - launch->mutable_container_id()->CopyFrom(checkContainerId); - launch->mutable_command()->CopyFrom(command); - - http::Request request; - request.method = "POST"; - request.url = agentURL.get(); - request.body = serialize(ContentType::PROTOBUF, evolve(call)); - request.headers = {{"Accept", stringify(ContentType::RECORDIO)}, - {"Message-Accept", stringify(ContentType::PROTOBUF)}, - {"Content-Type", stringify(ContentType::PROTOBUF)}}; - - if (authorizationHeader.isSome()) { - request.headers["Authorization"] = authorizationHeader.get(); - } - - // TODO(alexr): Use a lambda named capture for - // this cached value once it is available. - const Duration timeout = checkTimeout; - - auto checkTimedOut = std::make_shared<bool>(false); - - // `LAUNCH_NESTED_CONTAINER_SESSION` returns a streamed response with - // the output of the container. The agent will close the stream once - // the container has exited, or kill the container if the client - // closes the connection. - // - // We're calling `Connection::send` with `streamed = false`, so that - // it returns an HTTP response of type 'BODY' once the entire response - // is received. - // - // This means that this future will not be completed until after the - // check command has finished or the connection has been closed. - connection.send(request, false) - .after(checkTimeout, - defer(self(), - [timeout, checkTimedOut](Future<http::Response> future) { - future.discard(); - - *checkTimedOut = true; - - return Failure("Command timed out after " + stringify(timeout)); - })) - .onFailed(defer(self(), - &Self::nestedCommandCheckFailure, - promise, - connection, - checkContainerId, - checkTimedOut, - lambda::_1)) - .onReady(defer(self(), - &Self::___nestedCommandCheck, - promise, - checkContainerId, - lambda::_1)); -} - - -void CheckerProcess::___nestedCommandCheck( - shared_ptr<Promise<int>> promise, - const ContainerID& checkContainerId, - const http::Response& launchResponse) -{ - if (launchResponse.code != http::Status::OK) { - // The agent was unable to launch the check container, - // we treat this as a transient failure. - LOG(WARNING) << "Received '" << launchResponse.status << "' (" - << launchResponse.body << ") while launching COMMAND check" - << " for task '" << taskId << "'"; - - promise->discard(); - return; - } - - waitNestedContainer(checkContainerId) - .onFailed([promise](const string& failure) { - promise->fail( - "Unable to get the exit code: " + failure); - }) - .onReady([promise](const Option<int>& status) -> void { - if (status.isNone()) { - promise->fail("Unable to get the exit code"); - // TODO(gkleiman): Make sure that the following block works on Windows. - } else if (WIFSIGNALED(status.get()) && - WTERMSIG(status.get()) == SIGKILL) { - // The check container was signaled, probably because the task - // finished while the check was still in-flight, so we discard - // the result. - promise->discard(); - } else { - promise->set(status.get()); - } - }); -} - - -void CheckerProcess::nestedCommandCheckFailure( - shared_ptr<Promise<int>> promise, - http::Connection connection, - ContainerID checkContainerId, - shared_ptr<bool> checkTimedOut, - const string& failure) -{ - if (*checkTimedOut) { - // The check timed out, closing the connection will make the agent - // kill the container. - connection.disconnect(); - - // If the check delay interval is zero, we'll try to perform another - // check right after we finish processing the current timeout. - // - // We'll try to remove the container created for the check at the - // beginning of the next check. In order to prevent a failure, the - // promise should only be completed once we're sure that the - // container has terminated. - waitNestedContainer(checkContainerId) - .onAny([failure, promise](const Future<Option<int>>&) { - // We assume that once `WaitNestedContainer` returns, - // irrespective of whether the response contains a failure, the - // container will be in a terminal state, and that it will be - // possible to remove it. - // - // This means that we don't need to retry the - // `WaitNestedContainer` call. - promise->fail(failure); - }); - } else { - // The agent was not able to complete the request, discarding the - // promise signals the checker that it should retry the check. - // - // This will allow us to recover from a blip. The executor will - // pause the checker when it detects that the agent is not - // available. - LOG(WARNING) << "Connection to the agent to launch COMMAND check" - << " for task '" << taskId << "' failed: " << failure; - - promise->discard(); - } -} - - -Future<Option<int>> CheckerProcess::waitNestedContainer( - const ContainerID& containerId) -{ - agent::Call call; - call.set_type(agent::Call::WAIT_NESTED_CONTAINER); - - agent::Call::WaitNestedContainer* containerWait = - call.mutable_wait_nested_container(); - - containerWait->mutable_container_id()->CopyFrom(containerId); - - http::Request request; - request.method = "POST"; - request.url = agentURL.get(); - request.body = serialize(ContentType::PROTOBUF, evolve(call)); - request.headers = {{"Accept", stringify(ContentType::PROTOBUF)}, - {"Content-Type", stringify(ContentType::PROTOBUF)}}; - - if (authorizationHeader.isSome()) { - request.headers["Authorization"] = authorizationHeader.get(); - } - - return http::request(request, false) - .repair([containerId](const Future<http::Response>& future) { - return Failure( - "Connection to wait for check container '" + - stringify(containerId) + "' failed: " + future.failure()); - }) - .then(defer(self(), - &Self::_waitNestedContainer, containerId, lambda::_1)); -} - - -Future<Option<int>> CheckerProcess::_waitNestedContainer( - const ContainerID& containerId, - const http::Response& httpResponse) -{ - if (httpResponse.code != http::Status::OK) { - return Failure( - "Received '" + httpResponse.status + "' (" + httpResponse.body + - ") while waiting on check container '" + stringify(containerId) + "'"); - } - - Try<agent::Response> response = - deserialize<agent::Response>(ContentType::PROTOBUF, httpResponse.body); - CHECK_SOME(response); - - CHECK(response->has_wait_nested_container()); - - return ( - response->wait_nested_container().has_exit_status() - ? Option<int>(response->wait_nested_container().exit_status()) - : Option<int>::none()); -} - - -void CheckerProcess::processCommandCheckResult( - const Stopwatch& stopwatch, - const Future<int>& future) +Checker::~Checker() { - Option<CheckStatusInfo> result; - - // On Posix, `future` corresponds to termination information in the - // `stat_loc` area. On Windows, `status` is obtained via calling the - // `GetExitCodeProcess()` function. - // - // TODO(alexr): Ensure `WEXITSTATUS` family macros are no-op on Windows, - // see MESOS-7242. - if (future.isReady() && WIFEXITED(future.get())) { - const int exitCode = WEXITSTATUS(future.get()); - VLOG(1) << check.type() << " check for task '" << taskId << "'" - << " returned: " << exitCode; - - CheckStatusInfo checkStatusInfo; - checkStatusInfo.set_type(check.type()); - checkStatusInfo.mutable_command()->set_exit_code( - static_cast<int32_t>(exitCode)); - - result = checkStatusInfo; - } else if (future.isDiscarded()) { - // Check's status is currently not available due to a transient error, - // e.g., due to the agent failover, no `CheckStatusInfo` message should - // be sent to the callback. - LOG(INFO) << check.type() << " check for task '" << taskId << "' discarded"; - - result = None(); - } else { - // Check's status is currently not available, which may indicate a change - // that should be reported as an empty `CheckStatusInfo.Command` message. - LOG(WARNING) << check.type() << " check for task '" << taskId << "'" - << " failed: " << future.failure(); - - CheckStatusInfo checkStatusInfo; - checkStatusInfo.set_type(check.type()); - checkStatusInfo.mutable_command(); - - result = checkStatusInfo; - } - - processCheckResult(stopwatch, result); + terminate(process.get()); + wait(process.get()); } -Future<int> CheckerProcess::httpCheck() +void Checker::pause() { - CHECK_EQ(CheckInfo::HTTP, check.type()); - CHECK(check.has_http()); - - const CheckInfo::Http& http = check.http(); - - const string scheme = DEFAULT_HTTP_SCHEME; - const string path = http.has_path() ? http.path() : ""; - const string url = scheme + "://" + DEFAULT_DOMAIN + ":" + - stringify(http.port()) + path; - - VLOG(1) << "Launching HTTP check '" << url << "' for task '" << taskId << "'"; - - const vector<string> argv = { - HTTP_CHECK_COMMAND, - "-s", // Don't show progress meter or error messages. - "-S", // Makes curl show an error message if it fails. - "-L", // Follows HTTP 3xx redirects. - "-k", // Ignores SSL validation when scheme is https. - "-w", "%{http_code}", // Displays HTTP response code on stdout. - "-o", os::DEV_NULL, // Ignores output. - url - }; - - // TODO(alexr): Consider launching the helper binary once per task lifetime, - // see MESOS-6766. - Try<Subprocess> s = process::subprocess( - HTTP_CHECK_COMMAND, - argv, - Subprocess::PATH(os::DEV_NULL), - Subprocess::PIPE(), - Subprocess::PIPE(), - nullptr, - None(), - clone); - - if (s.isError()) { - return Failure( - "Failed to create the " + string(HTTP_CHECK_COMMAND) + - " subprocess: " + s.error()); - } - - // TODO(alexr): Use lambda named captures for - // these cached values once it is available. - const pid_t curlPid = s->pid(); - const Duration timeout = checkTimeout; - const TaskID _taskId = taskId; - - return await( - s->status(), - process::io::read(s->out().get()), - process::io::read(s->err().get())) - .after( - timeout, - [timeout, curlPid, _taskId](Future<tuple<Future<Option<int>>, - Future<string>, - Future<string>>> future) { - future.discard(); - - if (curlPid != -1) { - // Cleanup the HTTP_CHECK_COMMAND process. - VLOG(1) << "Killing the HTTP check process " << curlPid - << " for task '" << _taskId << "'"; - - os::killtree(curlPid, SIGKILL); - } - - return Failure( - string(HTTP_CHECK_COMMAND) + " timed out after " + - stringify(timeout)); - }) - .then(defer(self(), &Self::_httpCheck, lambda::_1)); + dispatch(process.get(), &CheckerProcess::pause); } -Future<int> CheckerProcess::_httpCheck( - const tuple<Future<Option<int>>, Future<string>, Future<string>>& t) +void Checker::resume() { - const Future<Option<int>>& status = std::get<0>(t); - if (!status.isReady()) { - return Failure( - "Failed to get the exit status of the " + string(HTTP_CHECK_COMMAND) + - " process: " + (status.isFailed() ? status.failure() : "discarded")); - } - - if (status->isNone()) { - return Failure( - "Failed to reap the " + string(HTTP_CHECK_COMMAND) + " process"); - } - - int exitCode = status->get(); - if (exitCode != 0) { - const Future<string>& error = std::get<2>(t); - if (!error.isReady()) { - return Failure( - string(HTTP_CHECK_COMMAND) + " returned " + - WSTRINGIFY(exitCode) + "; reading stderr failed: " + - (error.isFailed() ? error.failure() : "discarded")); - } - - return Failure( - string(HTTP_CHECK_COMMAND) + " returned " + - WSTRINGIFY(exitCode) + ": " + error.get()); - } - - const Future<string>& output = std::get<1>(t); - if (!output.isReady()) { - return Failure( - "Failed to read stdout from " + string(HTTP_CHECK_COMMAND) + ": " + - (output.isFailed() ? output.failure() : "discarded")); - } - - // Parse the output and get the HTTP status code. - Try<int> statusCode = numify<int>(output.get()); - if (statusCode.isError()) { - return Failure( - "Unexpected output from " + string(HTTP_CHECK_COMMAND) + ": " + - output.get()); - } - - return statusCode.get(); + dispatch(process.get(), &CheckerProcess::resume); } -void CheckerProcess::processHttpCheckResult( - const Stopwatch& stopwatch, - const Future<int>& future) -{ - CheckStatusInfo result; - result.set_type(check.type()); +void Checker::processCheckResult(const Try<CheckStatusInfo>& result) { + CheckStatusInfo checkStatusInfo; - if (future.isReady()) { - VLOG(1) << check.type() << " check for task '" - << taskId << "' returned: " << future.get(); + if (result.isError()) { + LOG(WARNING) << name << " for task '" << taskId << "'" + << " failed: " << result.error(); - result.mutable_http()->set_status_code(static_cast<uint32_t>(future.get())); + checkStatusInfo = createEmptyCheckStatusInfo(check); } else { - // Check's status is currently not available, which may indicate a change - // that should be reported as an empty `CheckStatusInfo.Http` message. - LOG(WARNING) << check.type() << " check for task '" << taskId << "' failed:" - << " " << (future.isFailed() ? future.failure() : "discarded"); - - result.mutable_http(); - } - - processCheckResult(stopwatch, result); -} - - -Future<bool> CheckerProcess::tcpCheck() -{ - CHECK_EQ(CheckInfo::TCP, check.type()); - CHECK(check.has_tcp()); - - // TCP_CHECK_COMMAND should be reachable. - CHECK(os::exists(launcherDir)); - - const CheckInfo::Tcp& tcp = check.tcp(); - - VLOG(1) << "Launching TCP check for task '" << taskId << "' at port " - << tcp.port(); - - const string command = path::join(launcherDir, TCP_CHECK_COMMAND); - - const vector<string> argv = { - command, - "--ip=" + stringify(DEFAULT_DOMAIN), - "--port=" + stringify(tcp.port()) - }; - - // TODO(alexr): Consider launching the helper binary once per task lifetime, - // see MESOS-6766. - Try<Subprocess> s = subprocess( - command, - argv, - Subprocess::PATH(os::DEV_NULL), - Subprocess::PIPE(), - Subprocess::PIPE(), - nullptr, - None(), - clone); - - if (s.isError()) { - return Failure( - "Failed to create the " + command + " subprocess: " + s.error()); - } - - // TODO(alexr): Use lambda named captures for - // these cached values once they are available. - pid_t commandPid = s->pid(); - const Duration timeout = checkTimeout; - const TaskID _taskId = taskId; - - return await( - s->status(), - process::io::read(s->out().get()), - process::io::read(s->err().get())) - .after( - timeout, - [timeout, commandPid, _taskId](Future<tuple<Future<Option<int>>, - Future<string>, - Future<string>>> future) - { - future.discard(); - - if (commandPid != -1) { - // Cleanup the TCP_CHECK_COMMAND process. - VLOG(1) << "Killing the TCP check process " << commandPid - << " for task '" << _taskId << "'"; - - os::killtree(commandPid, SIGKILL); - } - - return Failure( - string(TCP_CHECK_COMMAND) + " timed out after " + stringify(timeout)); - }) - .then(defer(self(), &Self::_tcpCheck, lambda::_1)); -} - - -Future<bool> CheckerProcess::_tcpCheck( - const tuple<Future<Option<int>>, Future<string>, Future<string>>& t) -{ - const Future<Option<int>>& status = std::get<0>(t); - if (!status.isReady()) { - return Failure( - "Failed to get the exit status of the " + string(TCP_CHECK_COMMAND) + - " process: " + (status.isFailed() ? status.failure() : "discarded")); + checkStatusInfo = result.get(); } - if (status->isNone()) { - return Failure( - "Failed to reap the " + string(TCP_CHECK_COMMAND) + " process"); - } - - int exitCode = status->get(); + // Trigger the callback if check info changes. + if (checkStatusInfo != previousCheckStatus) { + // We assume this is a local send, i.e., the checker library is not used + // in a binary external to the executor and hence can not exit before + // the data is sent to the executor. + callback(checkStatusInfo); - const Future<string>& commandOutput = std::get<1>(t); - if (commandOutput.isReady()) { - VLOG(1) << string(TCP_CHECK_COMMAND) << ": " << commandOutput.get(); - } - - if (exitCode != 0) { - const Future<string>& commandError = std::get<2>(t); - if (commandError.isReady()) { - VLOG(1) << string(TCP_CHECK_COMMAND) << ": " << commandError.get(); - } + previousCheckStatus = checkStatusInfo; } - - // Non-zero exit code of TCP_CHECK_COMMAND can mean configuration problem - // (e.g., bad command flag), system error (e.g., a socket cannot be - // created), or actually a failed connection. We cannot distinguish between - // these cases, hence treat all of them as connection failure. - return (exitCode == 0 ? true : false); } -void CheckerProcess::processTcpCheckResult( - const Stopwatch& stopwatch, - const Future<bool>& future) -{ - CheckStatusInfo result; - result.set_type(check.type()); - - if (future.isReady()) { - VLOG(1) << check.type() << " check for task '" - << taskId << "' returned: " << stringify(future.get()); - - result.mutable_tcp()->set_succeeded(future.get()); - } else { - // Check's status is currently not available, which may indicate a change - // that should be reported as an empty `CheckStatusInfo.Tcp` message. - LOG(WARNING) << check.type() << " check for task '" << taskId << "' failed:" - << " " << (future.isFailed() ? future.failure() : "discarded"); - - result.mutable_tcp(); - } - - processCheckResult(stopwatch, result); -} - namespace validation { Option<Error> checkInfo(const CheckInfo& checkInfo) http://git-wip-us.apache.org/repos/asf/mesos/blob/00bd7df7/src/checks/checker.hpp ---------------------------------------------------------------------- diff --git a/src/checks/checker.hpp b/src/checks/checker.hpp index bbe147f..6f65b47 100644 --- a/src/checks/checker.hpp +++ b/src/checks/checker.hpp @@ -30,13 +30,12 @@ #include <stout/option.hpp> #include <stout/try.hpp> +#include "checker_process.hpp" + namespace mesos { namespace internal { namespace checks { -// Forward declarations. -class CheckerProcess; - class Checker { public: @@ -45,7 +44,7 @@ public: * starts immediately after initialization. * * If the check is a COMMAND check, the checker will fork a process, enter - * the task's namespaces, and execute the commmand. + * the task's namespaces, and execute the command. * * @param check The protobuf message definition of a check. * @param launcherDir A directory where Mesos helper binaries are located. @@ -112,8 +111,26 @@ public: void resume(); private: - explicit Checker(process::Owned<CheckerProcess> process); + Checker( + const CheckInfo& _check, + const std::string& _launcherDir, + const lambda::function<void(const CheckStatusInfo&)>& _callback, + const TaskID& _taskId, + const Option<pid_t>& taskPid, + const std::vector<std::string>& _namespaces, + const Option<ContainerID>& _taskContainerId, + const Option<process::http::URL>& _agentURL, + const Option<std::string>& _authorizationHeader, + bool _commandCheckViaAgent); + + void processCheckResult(const Try<CheckStatusInfo>& result); + + const CheckInfo check; + const lambda::function<void(const CheckStatusInfo&)> callback; + const std::string name; + const TaskID taskId; + CheckStatusInfo previousCheckStatus; process::Owned<CheckerProcess> process; }; http://git-wip-us.apache.org/repos/asf/mesos/blob/00bd7df7/src/checks/checker_process.cpp ---------------------------------------------------------------------- diff --git a/src/checks/checker_process.cpp b/src/checks/checker_process.cpp new file mode 100644 index 0000000..fc0f3c7 --- /dev/null +++ b/src/checks/checker_process.cpp @@ -0,0 +1,1021 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "checks/checker_process.hpp" + +#include <cstdint> +#include <map> +#include <memory> +#include <string> +#include <tuple> +#include <vector> + +#include <glog/logging.h> + +#include <mesos/mesos.hpp> +#include <mesos/type_utils.hpp> + +#include <mesos/agent/agent.hpp> + +#include <process/collect.hpp> +#include <process/defer.hpp> +#include <process/delay.hpp> +#include <process/future.hpp> +#include <process/io.hpp> +#include <process/protobuf.hpp> +#include <process/subprocess.hpp> +#include <process/time.hpp> + +#include <stout/check.hpp> +#include <stout/duration.hpp> +#include <stout/foreach.hpp> +#include <stout/jsonify.hpp> +#include <stout/nothing.hpp> +#include <stout/option.hpp> +#include <stout/protobuf.hpp> +#include <stout/stopwatch.hpp> +#include <stout/strings.hpp> +#include <stout/try.hpp> +#include <stout/uuid.hpp> + +#include <stout/os/environment.hpp> +#include <stout/os/killtree.hpp> + +#include "common/http.hpp" +#include "common/protobuf_utils.hpp" +#include "common/status_utils.hpp" + +#include "internal/evolve.hpp" + +#ifdef __linux__ +#include "linux/ns.hpp" +#endif + +namespace http = process::http; + +using process::Failure; +using process::Future; +using process::Owned; +using process::Promise; +using process::Subprocess; + +using std::map; +using std::shared_ptr; +using std::string; +using std::tuple; +using std::vector; + +namespace mesos { +namespace internal { +namespace checks { + +#ifndef __WINDOWS__ +constexpr char HTTP_CHECK_COMMAND[] = "curl"; +constexpr char TCP_CHECK_COMMAND[] = "mesos-tcp-connect"; +#else +constexpr char HTTP_CHECK_COMMAND[] = "curl.exe"; +constexpr char TCP_CHECK_COMMAND[] = "mesos-tcp-connect.exe"; +#endif // __WINDOWS__ + +static const string DEFAULT_HTTP_SCHEME = "http"; + +// Use '127.0.0.1' instead of 'localhost', because the host +// file in some container images may not contain 'localhost'. +constexpr char DEFAULT_DOMAIN[] = "127.0.0.1"; + + +#ifdef __linux__ +// TODO(alexr): Instead of defining this ad-hoc clone function, provide a +// general solution for entering namespace in child processes, see MESOS-6184. +static pid_t cloneWithSetns( + const lambda::function<int()>& func, + const Option<pid_t>& taskPid, + const vector<string>& namespaces) +{ + return process::defaultClone([=]() -> int { + if (taskPid.isSome()) { + foreach (const string& ns, namespaces) { + Try<Nothing> setns = ns::setns(taskPid.get(), ns); + if (setns.isError()) { + // This effectively aborts the check. + LOG(FATAL) << "Failed to enter the " << ns << " namespace of task" + << " (pid: " << taskPid.get() << "): " << setns.error(); + } + + VLOG(1) << "Entered the " << ns << " namespace of task" + << " (pid: " << taskPid.get() << ") successfully"; + } + } + + return func(); + }); +} +#endif + + +CheckerProcess::CheckerProcess( + const CheckInfo& _check, + const string& _launcherDir, + const lambda::function<void(const Try<CheckStatusInfo>&)>& _callback, + const TaskID& _taskId, + const Option<pid_t>& _taskPid, + const vector<string>& _namespaces, + const Option<ContainerID>& _taskContainerId, + const Option<http::URL>& _agentURL, + const Option<std::string>& _authorizationHeader, + bool _commandCheckViaAgent) + : ProcessBase(process::ID::generate("checker")), + check(_check), + launcherDir(_launcherDir), + updateCallback(_callback), + taskId(_taskId), + taskPid(_taskPid), + namespaces(_namespaces), + taskContainerId(_taskContainerId), + agentURL(_agentURL), + authorizationHeader(_authorizationHeader), + commandCheckViaAgent(_commandCheckViaAgent), + paused(false) +{ + Try<Duration> create = Duration::create(check.delay_seconds()); + CHECK_SOME(create); + checkDelay = create.get(); + + create = Duration::create(check.interval_seconds()); + CHECK_SOME(create); + checkInterval = create.get(); + + // Zero value means infinite timeout. + create = Duration::create(check.timeout_seconds()); + CHECK_SOME(create); + checkTimeout = + (create.get() > Duration::zero()) ? create.get() : Duration::max(); + +#ifdef __linux__ + if (!namespaces.empty()) { + clone = lambda::bind(&cloneWithSetns, lambda::_1, taskPid, namespaces); + } +#endif +} + + +void CheckerProcess::initialize() +{ + scheduleNext(checkDelay); +} + + +void CheckerProcess::finalize() +{ + LOG(INFO) << "Checking for task '" << taskId << "' stopped"; +} + + +void CheckerProcess::performCheck() +{ + if (paused) { + return; + } + + Stopwatch stopwatch; + stopwatch.start(); + + switch (check.type()) { + case CheckInfo::COMMAND: { + Future<int> future = commandCheckViaAgent ? nestedCommandCheck() + : commandCheck(); + future.onAny(defer( + self(), + &Self::processCommandCheckResult, stopwatch, lambda::_1)); + break; + } + + case CheckInfo::HTTP: { + httpCheck().onAny(defer( + self(), + &Self::processHttpCheckResult, stopwatch, lambda::_1)); + break; + } + + case CheckInfo::TCP: { + tcpCheck().onAny(defer( + self(), + &Self::processTcpCheckResult, stopwatch, lambda::_1)); + break; + } + + case CheckInfo::UNKNOWN: { + LOG(FATAL) << "Received UNKNOWN check type"; + break; + } + } +} + + +void CheckerProcess::scheduleNext(const Duration& duration) +{ + CHECK(!paused); + + VLOG(1) << "Scheduling check for task '" << taskId << "' in " << duration; + + delay(duration, self(), &Self::performCheck); +} + + +void CheckerProcess::pause() +{ + if (!paused) { + VLOG(1) << "Checking for task '" << taskId << "' paused"; + + paused = true; + } +} + + +void CheckerProcess::resume() +{ + if (paused) { + VLOG(1) << "Checking for task '" << taskId << "' resumed"; + + paused = false; + + // Schedule a check immediately. + scheduleNext(Duration::zero()); + } +} + +void CheckerProcess::processCheckResult( + const Stopwatch& stopwatch, + const Result<CheckStatusInfo>& result) +{ + // `Checker` might have been paused while performing the check. + if (paused) { + LOG(INFO) << "Ignoring " << check.type() << " check result for" + << " task '" << taskId << "': checking is paused"; + return; + } + + // `result` will be: + // + // 1. `Some(CheckStatusInfo)` if it was possible to perform the check. + // 2. An `Error` if the check failed due to a non-transient error, + // e.g., timed out. + // 3. `None` if the check failed due to a transient error - this kind of + // failure will be silently ignored. + if (result.isSome()) { + // It was possible to perform the check. + VLOG(1) << "Performed " << check.type() << " check" + << " for task '" << taskId << "' in " << stopwatch.elapsed(); + + updateCallback(result.get()); + } else if (result.isError()) { + // The check failed due to a non-transient error. + updateCallback(Error(result.error())); + } else { + // The check failed due to a transient error. + LOG(INFO) << check.type() << " check for task '" << taskId << "' discarded"; + } + + scheduleNext(checkInterval); +} + + +Future<int> CheckerProcess::commandCheck() +{ + CHECK_EQ(CheckInfo::COMMAND, check.type()); + CHECK(check.has_command()); + + const CommandInfo& command = check.command().command(); + + map<string, string> environment = os::environment(); + + foreach (const Environment::Variable& variable, + command.environment().variables()) { + environment[variable.name()] = variable.value(); + } + + // Launch the subprocess. + Try<Subprocess> s = Error("Not launched"); + + if (command.shell()) { + // Use the shell variant. + VLOG(1) << "Launching COMMAND check '" << command.value() << "'" + << " for task '" << taskId << "'"; + + s = process::subprocess( + command.value(), + Subprocess::PATH(os::DEV_NULL), + Subprocess::FD(STDERR_FILENO), + Subprocess::FD(STDERR_FILENO), + environment, + clone); + } else { + // Use the exec variant. + vector<string> argv( + std::begin(command.arguments()), std::end(command.arguments())); + + VLOG(1) << "Launching COMMAND check [" << command.value() << ", " + << strings::join(", ", argv) << "] for task '" << taskId << "'"; + + s = process::subprocess( + command.value(), + argv, + Subprocess::PATH(os::DEV_NULL), + Subprocess::FD(STDERR_FILENO), + Subprocess::FD(STDERR_FILENO), + nullptr, + environment, + clone); + } + + if (s.isError()) { + return Failure("Failed to create subprocess: " + s.error()); + } + + // TODO(alexr): Use lambda named captures for + // these cached values once it is available. + const pid_t commandPid = s->pid(); + const Duration timeout = checkTimeout; + const TaskID _taskId = taskId; + + return s->status() + .after( + timeout, + [timeout, commandPid, _taskId](Future<Option<int>> future) { + future.discard(); + + if (commandPid != -1) { + // Cleanup the external command process. + VLOG(1) << "Killing the COMMAND check process '" << commandPid + << "' for task '" << _taskId << "'"; + + os::killtree(commandPid, SIGKILL); + } + + return Failure("Command timed out after " + stringify(timeout)); + }) + .then([](const Option<int>& exitCode) -> Future<int> { + if (exitCode.isNone()) { + return Failure("Failed to reap the command process"); + } + + return exitCode.get(); + }); +} + + +Future<int> CheckerProcess::nestedCommandCheck() +{ + CHECK_EQ(CheckInfo::COMMAND, check.type()); + CHECK(check.has_command()); + CHECK_SOME(taskContainerId); + CHECK_SOME(agentURL); + + VLOG(1) << "Launching COMMAND check for task '" << taskId << "'"; + + // We don't want recoverable errors, e.g., the agent responding with + // HTTP status code 503, to trigger a check failure. + // + // The future returned by this method represents the result of a + // check. It will be set to the exit status of the check command if it + // succeeded, to a `Failure` if there was a non-transient error, and + // discarded if there was a transient error. + auto promise = std::make_shared<Promise<int>>(); + + if (previousCheckContainerId.isSome()) { + agent::Call call; + call.set_type(agent::Call::REMOVE_NESTED_CONTAINER); + + agent::Call::RemoveNestedContainer* removeContainer = + call.mutable_remove_nested_container(); + + removeContainer->mutable_container_id()->CopyFrom( + previousCheckContainerId.get()); + + http::Request request; + request.method = "POST"; + request.url = agentURL.get(); + request.body = serialize(ContentType::PROTOBUF, evolve(call)); + request.headers = {{"Accept", stringify(ContentType::PROTOBUF)}, + {"Content-Type", stringify(ContentType::PROTOBUF)}}; + + if (authorizationHeader.isSome()) { + request.headers["Authorization"] = authorizationHeader.get(); + } + + http::request(request, false) + .onFailed(defer(self(), + [this, promise](const string& failure) { + LOG(WARNING) << "Connection to remove the nested container '" + << previousCheckContainerId.get() + << "' used for the COMMAND check for task '" + << taskId << "' failed: " << failure; + + // Something went wrong while sending the request, we treat this + // as a transient failure and discard the promise. + promise->discard(); + })) + .onReady(defer(self(), [this, promise](const http::Response& response) { + if (response.code != http::Status::OK) { + // The agent was unable to remove the check container, we + // treat this as a transient failure and discard the promise. + LOG(WARNING) << "Received '" << response.status << "' (" + << response.body << ") while removing the nested" + << " container '" << previousCheckContainerId.get() + << "' used for the COMMAND check for task '" + << taskId << "'"; + + promise->discard(); + } + + previousCheckContainerId = None(); + _nestedCommandCheck(promise); + })); + } else { + _nestedCommandCheck(promise); + } + + return promise->future(); +} + + +void CheckerProcess::_nestedCommandCheck(shared_ptr<Promise<int>> promise) +{ + // TODO(alexr): Use a lambda named capture for + // this cached value once it is available. + const TaskID _taskId = taskId; + + http::connect(agentURL.get()) + .onFailed(defer(self(), [_taskId, promise](const string& failure) { + LOG(WARNING) << "Unable to establish connection with the agent to launch" + << " COMMAND check for task '" << _taskId << "'" + << ": " << failure; + + // We treat this as a transient failure. + promise->discard(); + })) + .onReady(defer(self(), &Self::__nestedCommandCheck, promise, lambda::_1)); +} + + +void CheckerProcess::__nestedCommandCheck( + shared_ptr<Promise<int>> promise, + http::Connection connection) +{ + ContainerID checkContainerId; + checkContainerId.set_value("check-" + UUID::random().toString()); + checkContainerId.mutable_parent()->CopyFrom(taskContainerId.get()); + + previousCheckContainerId = checkContainerId; + + CommandInfo command(check.command().command()); + + agent::Call call; + call.set_type(agent::Call::LAUNCH_NESTED_CONTAINER_SESSION); + + agent::Call::LaunchNestedContainerSession* launch = + call.mutable_launch_nested_container_session(); + + launch->mutable_container_id()->CopyFrom(checkContainerId); + launch->mutable_command()->CopyFrom(command); + + http::Request request; + request.method = "POST"; + request.url = agentURL.get(); + request.body = serialize(ContentType::PROTOBUF, evolve(call)); + request.headers = {{"Accept", stringify(ContentType::RECORDIO)}, + {"Message-Accept", stringify(ContentType::PROTOBUF)}, + {"Content-Type", stringify(ContentType::PROTOBUF)}}; + + if (authorizationHeader.isSome()) { + request.headers["Authorization"] = authorizationHeader.get(); + } + + // TODO(alexr): Use a lambda named capture for + // this cached value once it is available. + const Duration timeout = checkTimeout; + + auto checkTimedOut = std::make_shared<bool>(false); + + // `LAUNCH_NESTED_CONTAINER_SESSION` returns a streamed response with + // the output of the container. The agent will close the stream once + // the container has exited, or kill the container if the client + // closes the connection. + // + // We're calling `Connection::send` with `streamed = false`, so that + // it returns an HTTP response of type 'BODY' once the entire response + // is received. + // + // This means that this future will not be completed until after the + // check command has finished or the connection has been closed. + connection.send(request, false) + .after(checkTimeout, + defer(self(), + [timeout, checkTimedOut](Future<http::Response> future) { + future.discard(); + + *checkTimedOut = true; + + return Failure("Command timed out after " + stringify(timeout)); + })) + .onFailed(defer(self(), + &Self::nestedCommandCheckFailure, + promise, + connection, + checkContainerId, + checkTimedOut, + lambda::_1)) + .onReady(defer(self(), + &Self::___nestedCommandCheck, + promise, + checkContainerId, + lambda::_1)); +} + + +void CheckerProcess::___nestedCommandCheck( + shared_ptr<Promise<int>> promise, + const ContainerID& checkContainerId, + const http::Response& launchResponse) +{ + if (launchResponse.code != http::Status::OK) { + // The agent was unable to launch the check container, + // we treat this as a transient failure. + LOG(WARNING) << "Received '" << launchResponse.status << "' (" + << launchResponse.body << ") while launching COMMAND check" + << " for task '" << taskId << "'"; + + promise->discard(); + return; + } + + waitNestedContainer(checkContainerId) + .onFailed([promise](const string& failure) { + promise->fail( + "Unable to get the exit code: " + failure); + }) + .onReady([promise](const Option<int>& status) -> void { + if (status.isNone()) { + promise->fail("Unable to get the exit code"); + // TODO(gkleiman): Make sure that the following block works on Windows. + } else if (WIFSIGNALED(status.get()) && + WTERMSIG(status.get()) == SIGKILL) { + // The check container was signaled, probably because the task + // finished while the check was still in-flight, so we discard + // the result. + promise->discard(); + } else { + promise->set(status.get()); + } + }); +} + + +void CheckerProcess::nestedCommandCheckFailure( + shared_ptr<Promise<int>> promise, + http::Connection connection, + ContainerID checkContainerId, + shared_ptr<bool> checkTimedOut, + const string& failure) +{ + if (*checkTimedOut) { + // The check timed out, closing the connection will make the agent + // kill the container. + connection.disconnect(); + + // If the check delay interval is zero, we'll try to perform another + // check right after we finish processing the current timeout. + // + // We'll try to remove the container created for the check at the + // beginning of the next check. In order to prevent a failure, the + // promise should only be completed once we're sure that the + // container has terminated. + waitNestedContainer(checkContainerId) + .onAny([failure, promise](const Future<Option<int>>&) { + // We assume that once `WaitNestedContainer` returns, + // irrespective of whether the response contains a failure, the + // container will be in a terminal state, and that it will be + // possible to remove it. + // + // This means that we don't need to retry the `WaitNestedContainer` + // call. + promise->fail(failure); + }); + } else { + // The agent was not able to complete the request, discarding the + // promise signals the checker that it should retry the check. + // + // This will allow us to recover from a blip. The executor will + // pause the checker when it detects that the agent is not + // available. + LOG(WARNING) << "Connection to the agent to launch COMMAND check" + << " for task '" << taskId << "' failed: " << failure; + + promise->discard(); + } +} + + +Future<Option<int>> CheckerProcess::waitNestedContainer( + const ContainerID& containerId) +{ + agent::Call call; + call.set_type(agent::Call::WAIT_NESTED_CONTAINER); + + agent::Call::WaitNestedContainer* containerWait = + call.mutable_wait_nested_container(); + + containerWait->mutable_container_id()->CopyFrom(containerId); + + http::Request request; + request.method = "POST"; + request.url = agentURL.get(); + request.body = serialize(ContentType::PROTOBUF, evolve(call)); + request.headers = {{"Accept", stringify(ContentType::PROTOBUF)}, + {"Content-Type", stringify(ContentType::PROTOBUF)}}; + + if (authorizationHeader.isSome()) { + request.headers["Authorization"] = authorizationHeader.get(); + } + + return http::request(request, false) + .repair([containerId](const Future<http::Response>& future) { + return Failure( + "Connection to wait for check container '" + + stringify(containerId) + "' failed: " + future.failure()); + }) + .then(defer(self(), + &Self::_waitNestedContainer, containerId, lambda::_1)); +} + + +Future<Option<int>> CheckerProcess::_waitNestedContainer( + const ContainerID& containerId, + const http::Response& httpResponse) +{ + if (httpResponse.code != http::Status::OK) { + return Failure( + "Received '" + httpResponse.status + "' (" + httpResponse.body + + ") while waiting on check container '" + stringify(containerId) + "'"); + } + + Try<agent::Response> response = + deserialize<agent::Response>(ContentType::PROTOBUF, httpResponse.body); + CHECK_SOME(response); + + CHECK(response->has_wait_nested_container()); + + return ( + response->wait_nested_container().has_exit_status() + ? Option<int>(response->wait_nested_container().exit_status()) + : Option<int>::none()); +} + + +void CheckerProcess::processCommandCheckResult( + const Stopwatch& stopwatch, + const Future<int>& future) +{ + CHECK(!future.isPending()); + + Result<CheckStatusInfo> result = None(); + + // On Posix, `future` corresponds to termination information in the + // `stat_loc` area. On Windows, `status` is obtained via calling the + // `GetExitCodeProcess()` function. + // + // TODO(alexr): Ensure `WEXITSTATUS` family macros are no-op on Windows, + // see MESOS-7242. + if (future.isReady() && WIFEXITED(future.get())) { + const int exitCode = WEXITSTATUS(future.get()); + VLOG(1) << check.type() << " check for task '" << taskId << "'" + << " returned: " << exitCode; + + CheckStatusInfo checkStatusInfo; + checkStatusInfo.set_type(check.type()); + checkStatusInfo.mutable_command()->set_exit_code( + static_cast<int32_t>(exitCode)); + + result = Result<CheckStatusInfo>(checkStatusInfo); + } else if (future.isDiscarded()) { + // Check's status is currently not available due to a transient error, + // e.g., due to the agent failover, no `CheckStatusInfo` message should + // be sent to the callback. + result = None(); + } else { + result = Result<CheckStatusInfo>(Error(future.failure())); + } + + processCheckResult(stopwatch, result); +} + + +Future<int> CheckerProcess::httpCheck() +{ + CHECK_EQ(CheckInfo::HTTP, check.type()); + CHECK(check.has_http()); + + const CheckInfo::Http& http = check.http(); + + const string scheme = DEFAULT_HTTP_SCHEME; + const string path = http.has_path() ? http.path() : ""; + const string url = scheme + "://" + DEFAULT_DOMAIN + ":" + + stringify(http.port()) + path; + + VLOG(1) << "Launching HTTP check '" << url << "' for task '" << taskId << "'"; + + const vector<string> argv = { + HTTP_CHECK_COMMAND, + "-s", // Don't show progress meter or error messages. + "-S", // Makes curl show an error message if it fails. + "-L", // Follows HTTP 3xx redirects. + "-k", // Ignores SSL validation when scheme is https. + "-w", "%{http_code}", // Displays HTTP response code on stdout. + "-o", os::DEV_NULL, // Ignores output. + url + }; + + // TODO(alexr): Consider launching the helper binary once per task lifetime, + // see MESOS-6766. + Try<Subprocess> s = process::subprocess( + HTTP_CHECK_COMMAND, + argv, + Subprocess::PATH(os::DEV_NULL), + Subprocess::PIPE(), + Subprocess::PIPE(), + nullptr, + None(), + clone); + + if (s.isError()) { + return Failure( + "Failed to create the " + string(HTTP_CHECK_COMMAND) + + " subprocess: " + s.error()); + } + + // TODO(alexr): Use lambda named captures for + // these cached values once it is available. + const pid_t curlPid = s->pid(); + const Duration timeout = checkTimeout; + const TaskID _taskId = taskId; + + return await( + s->status(), + process::io::read(s->out().get()), + process::io::read(s->err().get())) + .after( + timeout, + [timeout, curlPid, _taskId](Future<tuple<Future<Option<int>>, + Future<string>, + Future<string>>> future) { + future.discard(); + + if (curlPid != -1) { + // Cleanup the HTTP_CHECK_COMMAND process. + VLOG(1) << "Killing the HTTP check process " << curlPid + << " for task '" << _taskId << "'"; + + os::killtree(curlPid, SIGKILL); + } + + return Failure( + string(HTTP_CHECK_COMMAND) + " timed out after " + + stringify(timeout)); + }) + .then(defer(self(), &Self::_httpCheck, lambda::_1)); +} + + +Future<int> CheckerProcess::_httpCheck( + const tuple<Future<Option<int>>, Future<string>, Future<string>>& t) +{ + const Future<Option<int>>& status = std::get<0>(t); + if (!status.isReady()) { + return Failure( + "Failed to get the exit status of the " + string(HTTP_CHECK_COMMAND) + + " process: " + (status.isFailed() ? status.failure() : "discarded")); + } + + if (status->isNone()) { + return Failure( + "Failed to reap the " + string(HTTP_CHECK_COMMAND) + " process"); + } + + int exitCode = status->get(); + if (exitCode != 0) { + const Future<string>& error = std::get<2>(t); + if (!error.isReady()) { + return Failure( + string(HTTP_CHECK_COMMAND) + " returned " + + WSTRINGIFY(exitCode) + "; reading stderr failed: " + + (error.isFailed() ? error.failure() : "discarded")); + } + + return Failure( + string(HTTP_CHECK_COMMAND) + " returned " + + WSTRINGIFY(exitCode) + ": " + error.get()); + } + + const Future<string>& output = std::get<1>(t); + if (!output.isReady()) { + return Failure( + "Failed to read stdout from " + string(HTTP_CHECK_COMMAND) + ": " + + (output.isFailed() ? output.failure() : "discarded")); + } + + // Parse the output and get the HTTP status code. + Try<int> statusCode = numify<int>(output.get()); + if (statusCode.isError()) { + return Failure( + "Unexpected output from " + string(HTTP_CHECK_COMMAND) + ": " + + output.get()); + } + + return statusCode.get(); +} + + +void CheckerProcess::processHttpCheckResult( + const Stopwatch& stopwatch, + const Future<int>& future) +{ + CHECK(!future.isPending()); + + Result<CheckStatusInfo> result = None(); + + if (future.isReady()) { + VLOG(1) << check.type() << " check for task '" + << taskId << "' returned: " << future.get(); + + CheckStatusInfo checkStatusInfo; + checkStatusInfo.set_type(check.type()); + checkStatusInfo.mutable_http()->set_status_code( + static_cast<uint32_t>(future.get())); + + result = Result<CheckStatusInfo>(checkStatusInfo); + } else if (future.isDiscarded()) { + // Check's status is currently not available due to a transient error, + // e.g., due to the agent failover, no `CheckStatusInfo` message should + // be sent to the callback. + result = None(); + } else { + result = Result<CheckStatusInfo>(Error(future.failure())); + } + + processCheckResult(stopwatch, result); +} + + +Future<bool> CheckerProcess::tcpCheck() +{ + CHECK_EQ(CheckInfo::TCP, check.type()); + CHECK(check.has_tcp()); + + // TCP_CHECK_COMMAND should be reachable. + CHECK(os::exists(launcherDir)); + + const CheckInfo::Tcp& tcp = check.tcp(); + + VLOG(1) << "Launching TCP check for task '" << taskId << "' at port " + << tcp.port(); + + const string command = path::join(launcherDir, TCP_CHECK_COMMAND); + + const vector<string> argv = { + command, + "--ip=" + stringify(DEFAULT_DOMAIN), + "--port=" + stringify(tcp.port()) + }; + + // TODO(alexr): Consider launching the helper binary once per task lifetime, + // see MESOS-6766. + Try<Subprocess> s = subprocess( + command, + argv, + Subprocess::PATH(os::DEV_NULL), + Subprocess::PIPE(), + Subprocess::PIPE(), + nullptr, + None(), + clone); + + if (s.isError()) { + return Failure( + "Failed to create the " + command + " subprocess: " + s.error()); + } + + // TODO(alexr): Use lambda named captures for + // these cached values once they are available. + pid_t commandPid = s->pid(); + const Duration timeout = checkTimeout; + const TaskID _taskId = taskId; + + return await( + s->status(), + process::io::read(s->out().get()), + process::io::read(s->err().get())) + .after( + timeout, + [timeout, commandPid, _taskId](Future<tuple<Future<Option<int>>, + Future<string>, + Future<string>>> future) + { + future.discard(); + + if (commandPid != -1) { + // Cleanup the TCP_CHECK_COMMAND process. + VLOG(1) << "Killing the TCP check process " << commandPid + << " for task '" << _taskId << "'"; + + os::killtree(commandPid, SIGKILL); + } + + return Failure( + string(TCP_CHECK_COMMAND) + " timed out after " + stringify(timeout)); + }) + .then(defer(self(), &Self::_tcpCheck, lambda::_1)); +} + + +Future<bool> CheckerProcess::_tcpCheck( + const tuple<Future<Option<int>>, Future<string>, Future<string>>& t) +{ + const Future<Option<int>>& status = std::get<0>(t); + if (!status.isReady()) { + return Failure( + "Failed to get the exit status of the " + string(TCP_CHECK_COMMAND) + + " process: " + (status.isFailed() ? status.failure() : "discarded")); + } + + if (status->isNone()) { + return Failure( + "Failed to reap the " + string(TCP_CHECK_COMMAND) + " process"); + } + + int exitCode = status->get(); + + const Future<string>& commandOutput = std::get<1>(t); + if (commandOutput.isReady()) { + VLOG(1) << string(TCP_CHECK_COMMAND) << ": " << commandOutput.get(); + } + + if (exitCode != 0) { + const Future<string>& commandError = std::get<2>(t); + if (commandError.isReady()) { + VLOG(1) << string(TCP_CHECK_COMMAND) << ": " << commandError.get(); + } + } + + // Non-zero exit code of TCP_CHECK_COMMAND can mean configuration problem + // (e.g., bad command flag), system error (e.g., a socket cannot be + // created), or actually a failed connection. We cannot distinguish between + // these cases, hence treat all of them as connection failure. + return (exitCode == 0 ? true : false); +} + + +void CheckerProcess::processTcpCheckResult( + const Stopwatch& stopwatch, + const Future<bool>& future) +{ + CHECK(!future.isPending()); + + Result<CheckStatusInfo> result = None(); + + if (future.isReady()) { + VLOG(1) << check.type() << " check for task '" + << taskId << "' returned: " << stringify(future.get()); + + CheckStatusInfo checkStatusInfo; + checkStatusInfo.set_type(check.type()); + checkStatusInfo.mutable_tcp()->set_succeeded(future.get()); + + result = Result<CheckStatusInfo>(checkStatusInfo); + } else if (future.isDiscarded()) { + // Check's status is currently not available due to a transient error, + // e.g., due to the agent failover, no `CheckStatusInfo` message should + // be sent to the callback. + result = None(); + } else { + result = Result<CheckStatusInfo>(Error(future.failure())); + } + + processCheckResult(stopwatch, result); +} + +} // namespace checks { +} // namespace internal { +} // namespace mesos { http://git-wip-us.apache.org/repos/asf/mesos/blob/00bd7df7/src/checks/checker_process.hpp ---------------------------------------------------------------------- diff --git a/src/checks/checker_process.hpp b/src/checks/checker_process.hpp new file mode 100644 index 0000000..5aa2ba0 --- /dev/null +++ b/src/checks/checker_process.hpp @@ -0,0 +1,148 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef __CHECKER_PROCESS_HPP__ +#define __CHECKER_PROCESS_HPP__ + +#include <memory> +#include <string> +#include <tuple> +#include <vector> + +#include <mesos/mesos.hpp> + +#include <process/future.hpp> +#include <process/http.hpp> +#include <process/protobuf.hpp> + +#include <stout/duration.hpp> +#include <stout/option.hpp> +#include <stout/stopwatch.hpp> +#include <stout/try.hpp> + +namespace mesos { +namespace internal { +namespace checks { + +class CheckerProcess : public ProtobufProcess<CheckerProcess> +{ +public: + CheckerProcess( + const CheckInfo& _check, + const std::string& _launcherDir, + const lambda::function<void(const Try<CheckStatusInfo>&)>& _callback, + const TaskID& _taskId, + const Option<pid_t>& _taskPid, + const std::vector<std::string>& _namespaces, + const Option<ContainerID>& _taskContainerId, + const Option<process::http::URL>& _agentURL, + const Option<std::string>& _authorizationHeader, + bool _commandCheckViaAgent); + + void pause(); + void resume(); + + virtual ~CheckerProcess() {} + +protected: + void initialize() override; + void finalize() override; + +private: + void performCheck(); + void scheduleNext(const Duration& duration); + void processCheckResult( + const Stopwatch& stopwatch, + const Result<CheckStatusInfo>& result); + + process::Future<int> commandCheck(); + + process::Future<int> nestedCommandCheck(); + void _nestedCommandCheck(std::shared_ptr<process::Promise<int>> promise); + void __nestedCommandCheck( + std::shared_ptr<process::Promise<int>> promise, + process::http::Connection connection); + void ___nestedCommandCheck( + std::shared_ptr<process::Promise<int>> promise, + const ContainerID& checkContainerId, + const process::http::Response& launchResponse); + + void nestedCommandCheckFailure( + std::shared_ptr<process::Promise<int>> promise, + process::http::Connection connection, + ContainerID checkContainerId, + std::shared_ptr<bool> checkTimedOut, + const std::string& failure); + + process::Future<Option<int>> waitNestedContainer( + const ContainerID& containerId); + process::Future<Option<int>> _waitNestedContainer( + const ContainerID& containerId, + const process::http::Response& httpResponse); + + void processCommandCheckResult( + const Stopwatch& stopwatch, + const process::Future<int>& future); + + process::Future<int> httpCheck(); + process::Future<int> _httpCheck( + const std::tuple<process::Future<Option<int>>, + process::Future<std::string>, + process::Future<std::string>>& t); + void processHttpCheckResult( + const Stopwatch& stopwatch, + const process::Future<int>& future); + + process::Future<bool> tcpCheck(); + process::Future<bool> _tcpCheck( + const std::tuple<process::Future<Option<int>>, + process::Future<std::string>, + process::Future<std::string>>& t); + void processTcpCheckResult( + const Stopwatch& stopwatch, + const process::Future<bool>& future); + + const CheckInfo check; + Duration checkDelay; + Duration checkInterval; + Duration checkTimeout; + + // Contains the binary for TCP checks. + const std::string launcherDir; + + const lambda::function<void(const Try<CheckStatusInfo>&)> updateCallback; + const TaskID taskId; + const Option<pid_t> taskPid; + const std::vector<std::string> namespaces; + const Option<ContainerID> taskContainerId; + const Option<process::http::URL> agentURL; + const Option<std::string> authorizationHeader; + const bool commandCheckViaAgent; + + Option<lambda::function<pid_t(const lambda::function<int()>&)>> clone; + + bool paused; + + // Contains the ID of the most recently terminated nested container + // that was used to perform a COMMAND check. + Option<ContainerID> previousCheckContainerId; +}; + +} // namespace checks { +} // namespace internal { +} // namespace mesos { + +#endif // __CHECKER_PROCESS_HPP__