Updated http_command_executor.cpp to use v1 API. Review: https://reviews.apache.org/r/44424/
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/bec4d711 Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/bec4d711 Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/bec4d711 Branch: refs/heads/master Commit: bec4d711c3fc190138d100023bba9e3fe087052e Parents: ed30403 Author: Qian Zhang <zhang...@cn.ibm.com> Authored: Wed Apr 13 15:55:25 2016 -0700 Committer: Vinod Kone <vinodk...@gmail.com> Committed: Wed Apr 13 16:27:39 2016 -0700 ---------------------------------------------------------------------- src/launcher/http_command_executor.cpp | 539 +++++++++++++++------------- 1 file changed, 295 insertions(+), 244 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/bec4d711/src/launcher/http_command_executor.cpp ---------------------------------------------------------------------- diff --git a/src/launcher/http_command_executor.cpp b/src/launcher/http_command_executor.cpp index 7677391..ad484e0 100644 --- a/src/launcher/http_command_executor.cpp +++ b/src/launcher/http_command_executor.cpp @@ -24,7 +24,9 @@ #include <string> #include <vector> -#include <mesos/executor.hpp> +#include <mesos/v1/executor.hpp> +#include <mesos/v1/mesos.hpp> + #include <mesos/type_utils.hpp> #include <process/defer.hpp> @@ -41,6 +43,7 @@ #include <stout/flags.hpp> #include <stout/json.hpp> #include <stout/lambda.hpp> +#include <stout/linkedhashmap.hpp> #include <stout/option.hpp> #include <stout/os.hpp> #include <stout/path.hpp> @@ -50,6 +53,8 @@ #include "common/http.hpp" #include "common/status_utils.hpp" +#include "internal/evolve.hpp" + #ifdef __linux__ #include "linux/fs.hpp" #endif @@ -60,101 +65,230 @@ #include "slave/constants.hpp" -using namespace mesos::internal::slave; +#ifdef __linux__ +namespace fs = mesos::internal::fs; +#endif -using process::wait; // Necessary on some OS's to disambiguate. +using namespace mesos::internal::slave; using std::cout; using std::cerr; using std::endl; +using std::queue; using std::string; using std::vector; +using process::Clock; +using process::Future; +using process::Owned; +using process::Subprocess; +using process::Timer; + +using mesos::internal::evolve; +using mesos::internal::TaskHealthStatus; + +using mesos::v1::ExecutorID; +using mesos::v1::FrameworkID; + +using mesos::v1::executor::Call; +using mesos::v1::executor::Event; +using mesos::v1::executor::Mesos; + namespace mesos { +namespace v1 { namespace internal { -using namespace process; - -class CommandExecutorProcess : public ProtobufProcess<CommandExecutorProcess> +class HttpCommandExecutor: public ProtobufProcess<HttpCommandExecutor> { public: - CommandExecutorProcess( - const Option<char**>& override, + HttpCommandExecutor( + const Option<char**>& _override, const string& _healthCheckDir, const Option<string>& _sandboxDirectory, const Option<string>& _workingDirectory, const Option<string>& _user, const Option<string>& _taskCommand, + const FrameworkID& _frameworkId, + const ExecutorID& _executorId, const Duration& _shutdownGracePeriod) - : state(REGISTERING), - launched(false), - killed(false), - killedByHealthCheck(false), - pid(-1), - healthPid(-1), - shutdownGracePeriod(_shutdownGracePeriod), - driver(None()), - frameworkInfo(None()), - taskId(None()), - healthCheckDir(_healthCheckDir), - override(override), - sandboxDirectory(_sandboxDirectory), - workingDirectory(_workingDirectory), - user(_user), - taskCommand(_taskCommand) {} - - virtual ~CommandExecutorProcess() {} - - void registered( - ExecutorDriver* _driver, - const ExecutorInfo& _executorInfo, - const FrameworkInfo& _frameworkInfo, - const SlaveInfo& slaveInfo) + : state(DISCONNECTED), + launched(false), + killed(false), + killedByHealthCheck(false), + pid(-1), + healthPid(-1), + shutdownGracePeriod(_shutdownGracePeriod), + frameworkInfo(None()), + taskId(None()), + healthCheckDir(_healthCheckDir), + override(_override), + sandboxDirectory(_sandboxDirectory), + workingDirectory(_workingDirectory), + user(_user), + taskCommand(_taskCommand), + frameworkId(_frameworkId), + executorId(_executorId), + task(None()) {} + + virtual ~HttpCommandExecutor() = default; + + void connected() { - CHECK_EQ(REGISTERING, state); + state = CONNECTED; + + doReliableRegistration(); + } + + void disconnected() + { + state = DISCONNECTED; + } + + void received(queue<Event> events) + { + while (!events.empty()) { + Event event = events.front(); + events.pop(); + + cout << "Received " << event.type() << " event" << endl; + + switch (event.type()) { + case Event::SUBSCRIBED: { + cout << "Subscribed executor on " + << event.subscribed().agent_info().hostname() << endl; + + frameworkInfo = event.subscribed().framework_info(); + state = SUBSCRIBED; + break; + } - cout << "Registered executor on " << slaveInfo.hostname() << endl; + case Event::LAUNCH: { + launch(event.launch().task()); + break; + } - driver = _driver; - frameworkInfo = _frameworkInfo; + case Event::KILL: { + kill(event.kill().task_id()); + break; + } - state = REGISTERED; + case Event::ACKNOWLEDGED: { + // Remove the corresponding update. + updates.erase(UUID::fromBytes(event.acknowledged().uuid())); + + // Remove the corresponding task. + task = None(); + break; + } + + case Event::SHUTDOWN: { + shutdown(); + break; + } + + case Event::MESSAGE: { + break; + } + + case Event::ERROR: { + cerr << "Error: " << event.error().message() << endl; + } + + default: { + UNREACHABLE(); + } + } + } } - void reregistered( - ExecutorDriver* driver, - const SlaveInfo& slaveInfo) +protected: + virtual void initialize() { - CHECK(state == REGISTERED || state == REGISTERING) << state; + // TODO(qianzhang): Currently, the `mesos-health-check` binary can only + // send unversioned `TaskHealthStatus` messages. This needs to be revisited + // as part of MESOS-5103. + install<TaskHealthStatus>( + &HttpCommandExecutor::taskHealthUpdated, + &TaskHealthStatus::task_id, + &TaskHealthStatus::healthy, + &TaskHealthStatus::kill_task); - cout << "Re-registered executor on " << slaveInfo.hostname() << endl; + // We initialize the library here to ensure that callbacks are only invoked + // after the process has spawned. + mesos.reset(new Mesos( + mesos::ContentType::PROTOBUF, + defer(self(), &Self::connected), + defer(self(), &Self::disconnected), + defer(self(), &Self::received, lambda::_1))); + } - state = REGISTERED; + void taskHealthUpdated( + const mesos::TaskID& taskID, + const bool healthy, + const bool initiateTaskKill) + { + cout << "Received task health update, healthy: " + << stringify(healthy) << endl; + + update(evolve(taskID), TASK_RUNNING, healthy); + + if (initiateTaskKill) { + killedByHealthCheck = true; + kill(evolve(taskID)); + } } - void disconnected(ExecutorDriver* driver) {} + void doReliableRegistration() + { + if (state == SUBSCRIBED || state == DISCONNECTED) { + return; + } + + Call call; + call.set_type(Call::SUBSCRIBE); + + call.mutable_framework_id()->CopyFrom(frameworkId); + call.mutable_executor_id()->CopyFrom(executorId); - void launchTask(ExecutorDriver* driver, const TaskInfo& task) + Call::Subscribe* subscribe = call.mutable_subscribe(); + + // Send all unacknowledged updates. + foreach (const Call::Update& update, updates.values()) { + subscribe->add_unacknowledged_updates()->MergeFrom(update); + } + + // Send the unacknowledged task. + if (task.isSome()) { + subscribe->add_unacknowledged_tasks()->MergeFrom(task.get()); + } + + mesos->send(call); + + delay(Seconds(1), self(), &Self::doReliableRegistration); + } + + void launch(const TaskInfo& _task) { - CHECK_EQ(REGISTERED, state); + CHECK_EQ(SUBSCRIBED, state); if (launched) { - TaskStatus status; - status.mutable_task_id()->MergeFrom(task.task_id()); - status.set_state(TASK_FAILED); - status.set_message( + update( + _task.task_id(), + TASK_FAILED, + None(), "Attempted to run multiple tasks using a \"command\" executor"); - - driver->sendStatusUpdate(status); return; } + // Capture the task. + task = _task; + // Capture the TaskID. - taskId = task.task_id(); + taskId = task->task_id(); // Capture the kill policy. - if (task.has_kill_policy()) { - killPolicy = task.kill_policy(); + if (task->has_kill_policy()) { + killPolicy = task->kill_policy(); } // Determine the command to launch the task. @@ -175,11 +309,11 @@ public: } command = parse.get(); - } else if (task.has_command()) { - command = task.command(); + } else if (task->has_command()) { + command = task->command(); } else { CHECK_SOME(override) - << "Expecting task '" << task.task_id() + << "Expecting task '" << task->task_id() << "' to have a command!"; } @@ -191,16 +325,16 @@ public: // correct solution is to perform this validation at master side. if (command.shell()) { CHECK(command.has_value()) - << "Shell command of task '" << task.task_id() + << "Shell command of task '" << task->task_id() << "' is not specified!"; } else { CHECK(command.has_value()) - << "Executable of task '" << task.task_id() + << "Executable of task '" << task->task_id() << "' is not specified!"; } } - cout << "Starting task " << task.task_id() << endl; + cout << "Starting task " << task->task_id() << endl; // TODO(benh): Clean this up with the new 'Fork' abstraction. // Use pipes to determine which child has successfully changed @@ -448,25 +582,22 @@ public: cout << "Forked command at " << pid << endl; - if (task.has_health_check()) { - launchHealthCheck(task); + if (task->has_health_check()) { + launchHealthCheck(task.get()); } // Monitor this process. process::reap(pid) - .onAny(defer(self(), &Self::reaped, driver, pid, lambda::_1)); + .onAny(defer(self(), &Self::reaped, pid, lambda::_1)); - TaskStatus status; - status.mutable_task_id()->MergeFrom(task.task_id()); - status.set_state(TASK_RUNNING); - driver->sendStatusUpdate(status); + update(task->task_id(), TASK_RUNNING); launched = true; } - void killTask(ExecutorDriver* driver, const TaskID& taskId) + void kill(const TaskID& taskId) { - cout << "Received killTask for task " << taskId.value() << endl; + cout << "Received kill for task " << taskId.value() << endl; // Default grace period is set to 3s for backwards compatibility. // @@ -478,12 +609,10 @@ public: gracePeriod = Nanoseconds(killPolicy->grace_period().nanoseconds()); } - killTask(driver, taskId, gracePeriod); + kill(taskId, gracePeriod); } - void frameworkMessage(ExecutorDriver* driver, const string& data) {} - - void shutdown(ExecutorDriver* driver) + void shutdown() { cout << "Shutting down" << endl; @@ -504,53 +633,12 @@ public: // agent is smaller than the kill grace period). if (launched) { CHECK_SOME(taskId); - killTask(driver, taskId.get(), gracePeriod); - } else { - driver->stop(); - } - } - - virtual void error(ExecutorDriver* driver, const string& message) {} - -protected: - virtual void initialize() - { - install<TaskHealthStatus>( - &CommandExecutorProcess::taskHealthUpdated, - &TaskHealthStatus::task_id, - &TaskHealthStatus::healthy, - &TaskHealthStatus::kill_task); - } - - void taskHealthUpdated( - const TaskID& taskID, - const bool& healthy, - const bool& initiateTaskKill) - { - if (driver.isNone()) { - return; - } - - cout << "Received task health update, healthy: " - << stringify(healthy) << endl; - - TaskStatus status; - status.mutable_task_id()->CopyFrom(taskID); - status.set_healthy(healthy); - status.set_state(TASK_RUNNING); - driver.get()->sendStatusUpdate(status); - - if (initiateTaskKill) { - killedByHealthCheck = true; - killTask(driver.get(), taskID); + kill(taskId.get(), gracePeriod); } } private: - void killTask( - ExecutorDriver* driver, - const TaskID& _taskId, - const Duration& gracePeriod) + void kill(const TaskID& _taskId, const Duration& gracePeriod) { if (launched && !killed) { // Send TASK_KILLING if the framework can handle it. @@ -561,10 +649,7 @@ private: foreach (const FrameworkInfo::Capability& c, frameworkInfo->capabilities()) { if (c.type() == FrameworkInfo::Capability::TASK_KILLING_STATE) { - TaskStatus status; - status.mutable_task_id()->CopyFrom(taskId.get()); - status.set_state(TASK_KILLING); - driver->sendStatusUpdate(status); + update(taskId.get(), TASK_KILLING); break; } } @@ -606,10 +691,7 @@ private: } } - void reaped( - ExecutorDriver* driver, - pid_t pid, - const Future<Option<int> >& status_) + void reaped(pid_t pid, const Future<Option<int> >& status_) { TaskState taskState; string message; @@ -632,7 +714,7 @@ private: taskState = TASK_FINISHED; } else if (killed) { // Send TASK_KILLED if the task was killed as a result of - // killTask() or shutdown(). + // kill() or shutdown(). taskState = TASK_KILLED; } else { taskState = TASK_FAILED; @@ -645,22 +727,17 @@ private: CHECK_SOME(taskId); - TaskStatus taskStatus; - taskStatus.mutable_task_id()->MergeFrom(taskId.get()); - taskStatus.set_state(taskState); - taskStatus.set_message(message); if (killed && killedByHealthCheck) { - taskStatus.set_healthy(false); + update(taskId.get(), taskState, false, message); + } else { + update(taskId.get(), taskState, None(), message); } - driver->sendStatusUpdate(taskStatus); - - // This is a hack to ensure the message is sent to the - // slave before we exit the process. Without this, we - // may exit before libprocess has sent the data over - // the socket. See MESOS-4111. + // TODO(qianzhang): Remove this hack since the executor now receives + // acknowledgements for status updates. The executor can terminate + // after it receives an ACK for a terminal status update. os::sleep(Seconds(1)); - driver->stop(); + terminate(self()); } void escalated(Duration timeout) @@ -728,10 +805,49 @@ private: << stringify(healthPid) << endl; } + void update( + const TaskID& taskID, + const TaskState& state, + const Option<bool>& healthy = None(), + const Option<string>& message = None()) + { + UUID uuid = UUID::random(); + + TaskStatus status; + status.mutable_task_id()->CopyFrom(taskID); + status.mutable_executor_id()->CopyFrom(executorId); + + status.set_state(state); + status.set_source(TaskStatus::SOURCE_EXECUTOR); + status.set_uuid(uuid.toBytes()); + + if (healthy.isSome()) { + status.set_healthy(healthy.get()); + } + + if (message.isSome()) { + status.set_message(message.get()); + } + + Call call; + call.set_type(Call::UPDATE); + + call.mutable_framework_id()->CopyFrom(frameworkId); + call.mutable_executor_id()->CopyFrom(executorId); + + call.mutable_update()->mutable_status()->CopyFrom(status); + + // Capture the status update. + updates[uuid] = call.update(); + + mesos->send(call); + } + enum State { - REGISTERING, // Executor is launched but not (re-)registered yet. - REGISTERED, // Executor has (re-)registered. + CONNECTED, + DISCONNECTED, + SUBSCRIBED } state; bool launched; @@ -742,7 +858,6 @@ private: Duration shutdownGracePeriod; Option<KillPolicy> killPolicy; Timer escalationTimer; - Option<ExecutorDriver*> driver; Option<FrameworkInfo> frameworkInfo; Option<TaskID> taskId; string healthCheckDir; @@ -751,99 +866,15 @@ private: Option<string> workingDirectory; Option<string> user; Option<string> taskCommand; -}; - - -class CommandExecutor: public Executor -{ -public: - CommandExecutor( - const Option<char**>& override, - const string& healthCheckDir, - const Option<string>& sandboxDirectory, - const Option<string>& workingDirectory, - const Option<string>& user, - const Option<string>& taskCommand, - const Duration& shutdownGracePeriod) - { - process = new CommandExecutorProcess( - override, - healthCheckDir, - sandboxDirectory, - workingDirectory, - user, - taskCommand, - shutdownGracePeriod); - - spawn(process); - } - - virtual ~CommandExecutor() - { - terminate(process); - wait(process); - delete process; - } - - virtual void registered( - ExecutorDriver* driver, - const ExecutorInfo& executorInfo, - const FrameworkInfo& frameworkInfo, - const SlaveInfo& slaveInfo) - { - dispatch(process, - &CommandExecutorProcess::registered, - driver, - executorInfo, - frameworkInfo, - slaveInfo); - } - - virtual void reregistered( - ExecutorDriver* driver, - const SlaveInfo& slaveInfo) - { - dispatch(process, - &CommandExecutorProcess::reregistered, - driver, - slaveInfo); - } - - virtual void disconnected(ExecutorDriver* driver) - { - dispatch(process, &CommandExecutorProcess::disconnected, driver); - } - - virtual void launchTask(ExecutorDriver* driver, const TaskInfo& task) - { - dispatch(process, &CommandExecutorProcess::launchTask, driver, task); - } - - virtual void killTask(ExecutorDriver* driver, const TaskID& taskId) - { - dispatch(process, &CommandExecutorProcess::killTask, driver, taskId); - } - - virtual void frameworkMessage(ExecutorDriver* driver, const string& data) - { - dispatch(process, &CommandExecutorProcess::frameworkMessage, driver, data); - } - - virtual void shutdown(ExecutorDriver* driver) - { - dispatch(process, &CommandExecutorProcess::shutdown, driver); - } - - virtual void error(ExecutorDriver* driver, const string& data) - { - dispatch(process, &CommandExecutorProcess::error, driver, data); - } - -private: - CommandExecutorProcess* process; + const FrameworkID frameworkId; + const ExecutorID executorId; + Owned<Mesos> mesos; + LinkedHashMap<UUID, Call::Update> updates; // Unacknowledged updates. + Option<TaskInfo> task; // Unacknowledged task. }; } // namespace internal { +} // namespace v1 { } // namespace mesos { @@ -897,6 +928,8 @@ public: int main(int argc, char** argv) { Flags flags; + FrameworkID frameworkId; + ExecutorID executorId; // Load flags from command line. Try<Nothing> load = flags.load(None(), &argc, &argv); @@ -929,6 +962,20 @@ int main(int argc, char** argv) ? envPath.get() : os::realpath(Path(argv[0]).dirname()).get(); + Option<string> value = os::getenv("MESOS_FRAMEWORK_ID"); + if (value.isNone()) { + EXIT(EXIT_FAILURE) + << "Expecting 'MESOS_FRAMEWORK_ID' to be set in the environment"; + } + frameworkId.set_value(value.get()); + + value = os::getenv("MESOS_EXECUTOR_ID"); + if (value.isNone()) { + EXIT(EXIT_FAILURE) + << "Expecting 'MESOS_EXECUTOR_ID' to be set in the environment"; + } + executorId.set_value(value.get()); + // Get executor shutdown grace period from the environment. // // NOTE: We avoided introducing a command executor flag for this @@ -936,7 +983,7 @@ int main(int argc, char** argv) // This makes it difficult to add or remove command executor flags // that are unconditionally set by the agent. Duration shutdownGracePeriod = DEFAULT_EXECUTOR_SHUTDOWN_GRACE_PERIOD; - Option<string> value = os::getenv("MESOS_EXECUTOR_SHUTDOWN_GRACE_PERIOD"); + value = os::getenv("MESOS_EXECUTOR_SHUTDOWN_GRACE_PERIOD"); if (value.isSome()) { Try<Duration> parse = Duration::parse(value.get()); if (parse.isError()) { @@ -948,16 +995,20 @@ int main(int argc, char** argv) shutdownGracePeriod = parse.get(); } - mesos::internal::CommandExecutor executor( - override, - path, - flags.sandbox_directory, - flags.working_directory, - flags.user, - flags.task_command, - shutdownGracePeriod); - - mesos::MesosExecutorDriver driver(&executor); - - return driver.run() == mesos::DRIVER_STOPPED ? EXIT_SUCCESS : EXIT_FAILURE; + Owned<mesos::v1::internal::HttpCommandExecutor> executor( + new mesos::v1::internal::HttpCommandExecutor( + override, + path, + flags.sandbox_directory, + flags.working_directory, + flags.user, + flags.task_command, + frameworkId, + executorId, + shutdownGracePeriod)); + + process::spawn(executor.get()); + process::wait(executor.get()); + + return EXIT_SUCCESS; }