Updated http_command_executor.cpp to use v1 API.

Review: https://reviews.apache.org/r/44424/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/bec4d711
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/bec4d711
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/bec4d711

Branch: refs/heads/master
Commit: bec4d711c3fc190138d100023bba9e3fe087052e
Parents: ed30403
Author: Qian Zhang <zhang...@cn.ibm.com>
Authored: Wed Apr 13 15:55:25 2016 -0700
Committer: Vinod Kone <vinodk...@gmail.com>
Committed: Wed Apr 13 16:27:39 2016 -0700

----------------------------------------------------------------------
 src/launcher/http_command_executor.cpp | 539 +++++++++++++++-------------
 1 file changed, 295 insertions(+), 244 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/bec4d711/src/launcher/http_command_executor.cpp
----------------------------------------------------------------------
diff --git a/src/launcher/http_command_executor.cpp 
b/src/launcher/http_command_executor.cpp
index 7677391..ad484e0 100644
--- a/src/launcher/http_command_executor.cpp
+++ b/src/launcher/http_command_executor.cpp
@@ -24,7 +24,9 @@
 #include <string>
 #include <vector>
 
-#include <mesos/executor.hpp>
+#include <mesos/v1/executor.hpp>
+#include <mesos/v1/mesos.hpp>
+
 #include <mesos/type_utils.hpp>
 
 #include <process/defer.hpp>
@@ -41,6 +43,7 @@
 #include <stout/flags.hpp>
 #include <stout/json.hpp>
 #include <stout/lambda.hpp>
+#include <stout/linkedhashmap.hpp>
 #include <stout/option.hpp>
 #include <stout/os.hpp>
 #include <stout/path.hpp>
@@ -50,6 +53,8 @@
 #include "common/http.hpp"
 #include "common/status_utils.hpp"
 
+#include "internal/evolve.hpp"
+
 #ifdef __linux__
 #include "linux/fs.hpp"
 #endif
@@ -60,101 +65,230 @@
 
 #include "slave/constants.hpp"
 
-using namespace mesos::internal::slave;
+#ifdef __linux__
+namespace fs = mesos::internal::fs;
+#endif
 
-using process::wait; // Necessary on some OS's to disambiguate.
+using namespace mesos::internal::slave;
 
 using std::cout;
 using std::cerr;
 using std::endl;
+using std::queue;
 using std::string;
 using std::vector;
 
+using process::Clock;
+using process::Future;
+using process::Owned;
+using process::Subprocess;
+using process::Timer;
+
+using mesos::internal::evolve;
+using mesos::internal::TaskHealthStatus;
+
+using mesos::v1::ExecutorID;
+using mesos::v1::FrameworkID;
+
+using mesos::v1::executor::Call;
+using mesos::v1::executor::Event;
+using mesos::v1::executor::Mesos;
+
 namespace mesos {
+namespace v1 {
 namespace internal {
 
-using namespace process;
-
-class CommandExecutorProcess : public ProtobufProcess<CommandExecutorProcess>
+class HttpCommandExecutor: public ProtobufProcess<HttpCommandExecutor>
 {
 public:
-  CommandExecutorProcess(
-      const Option<char**>& override,
+  HttpCommandExecutor(
+      const Option<char**>& _override,
       const string& _healthCheckDir,
       const Option<string>& _sandboxDirectory,
       const Option<string>& _workingDirectory,
       const Option<string>& _user,
       const Option<string>& _taskCommand,
+      const FrameworkID& _frameworkId,
+      const ExecutorID& _executorId,
       const Duration& _shutdownGracePeriod)
-    : state(REGISTERING),
-      launched(false),
-      killed(false),
-      killedByHealthCheck(false),
-      pid(-1),
-      healthPid(-1),
-      shutdownGracePeriod(_shutdownGracePeriod),
-      driver(None()),
-      frameworkInfo(None()),
-      taskId(None()),
-      healthCheckDir(_healthCheckDir),
-      override(override),
-      sandboxDirectory(_sandboxDirectory),
-      workingDirectory(_workingDirectory),
-      user(_user),
-      taskCommand(_taskCommand) {}
-
-  virtual ~CommandExecutorProcess() {}
-
-  void registered(
-      ExecutorDriver* _driver,
-      const ExecutorInfo& _executorInfo,
-      const FrameworkInfo& _frameworkInfo,
-      const SlaveInfo& slaveInfo)
+  : state(DISCONNECTED),
+    launched(false),
+    killed(false),
+    killedByHealthCheck(false),
+    pid(-1),
+    healthPid(-1),
+    shutdownGracePeriod(_shutdownGracePeriod),
+    frameworkInfo(None()),
+    taskId(None()),
+    healthCheckDir(_healthCheckDir),
+    override(_override),
+    sandboxDirectory(_sandboxDirectory),
+    workingDirectory(_workingDirectory),
+    user(_user),
+    taskCommand(_taskCommand),
+    frameworkId(_frameworkId),
+    executorId(_executorId),
+    task(None()) {}
+
+  virtual ~HttpCommandExecutor() = default;
+
+  void connected()
   {
-    CHECK_EQ(REGISTERING, state);
+    state = CONNECTED;
+
+    doReliableRegistration();
+  }
+
+  void disconnected()
+  {
+    state = DISCONNECTED;
+  }
+
+  void received(queue<Event> events)
+  {
+    while (!events.empty()) {
+      Event event = events.front();
+      events.pop();
+
+      cout << "Received " << event.type() << " event" << endl;
+
+      switch (event.type()) {
+        case Event::SUBSCRIBED: {
+          cout << "Subscribed executor on "
+               << event.subscribed().agent_info().hostname() << endl;
+
+          frameworkInfo = event.subscribed().framework_info();
+          state = SUBSCRIBED;
+          break;
+        }
 
-    cout << "Registered executor on " << slaveInfo.hostname() << endl;
+        case Event::LAUNCH: {
+          launch(event.launch().task());
+          break;
+        }
 
-    driver = _driver;
-    frameworkInfo = _frameworkInfo;
+        case Event::KILL: {
+          kill(event.kill().task_id());
+          break;
+        }
 
-    state = REGISTERED;
+        case Event::ACKNOWLEDGED: {
+          // Remove the corresponding update.
+          updates.erase(UUID::fromBytes(event.acknowledged().uuid()));
+
+          // Remove the corresponding task.
+          task = None();
+          break;
+        }
+
+        case Event::SHUTDOWN: {
+          shutdown();
+          break;
+        }
+
+        case Event::MESSAGE: {
+          break;
+        }
+
+        case Event::ERROR: {
+          cerr << "Error: " << event.error().message() << endl;
+        }
+
+        default: {
+          UNREACHABLE();
+        }
+      }
+    }
   }
 
-  void reregistered(
-      ExecutorDriver* driver,
-      const SlaveInfo& slaveInfo)
+protected:
+  virtual void initialize()
   {
-    CHECK(state == REGISTERED || state == REGISTERING) << state;
+    // TODO(qianzhang): Currently, the `mesos-health-check` binary can only
+    // send unversioned `TaskHealthStatus` messages. This needs to be revisited
+    // as part of MESOS-5103.
+    install<TaskHealthStatus>(
+        &HttpCommandExecutor::taskHealthUpdated,
+        &TaskHealthStatus::task_id,
+        &TaskHealthStatus::healthy,
+        &TaskHealthStatus::kill_task);
 
-    cout << "Re-registered executor on " << slaveInfo.hostname() << endl;
+    // We initialize the library here to ensure that callbacks are only invoked
+    // after the process has spawned.
+    mesos.reset(new Mesos(
+        mesos::ContentType::PROTOBUF,
+        defer(self(), &Self::connected),
+        defer(self(), &Self::disconnected),
+        defer(self(), &Self::received, lambda::_1)));
+  }
 
-    state = REGISTERED;
+  void taskHealthUpdated(
+      const mesos::TaskID& taskID,
+      const bool healthy,
+      const bool initiateTaskKill)
+  {
+    cout << "Received task health update, healthy: "
+         << stringify(healthy) << endl;
+
+    update(evolve(taskID), TASK_RUNNING, healthy);
+
+    if (initiateTaskKill) {
+      killedByHealthCheck = true;
+      kill(evolve(taskID));
+    }
   }
 
-  void disconnected(ExecutorDriver* driver) {}
+  void doReliableRegistration()
+  {
+    if (state == SUBSCRIBED || state == DISCONNECTED) {
+      return;
+    }
+
+    Call call;
+    call.set_type(Call::SUBSCRIBE);
+
+    call.mutable_framework_id()->CopyFrom(frameworkId);
+    call.mutable_executor_id()->CopyFrom(executorId);
 
-  void launchTask(ExecutorDriver* driver, const TaskInfo& task)
+    Call::Subscribe* subscribe = call.mutable_subscribe();
+
+    // Send all unacknowledged updates.
+    foreach (const Call::Update& update, updates.values()) {
+      subscribe->add_unacknowledged_updates()->MergeFrom(update);
+    }
+
+    // Send the unacknowledged task.
+    if (task.isSome()) {
+      subscribe->add_unacknowledged_tasks()->MergeFrom(task.get());
+    }
+
+    mesos->send(call);
+
+    delay(Seconds(1), self(), &Self::doReliableRegistration);
+  }
+
+  void launch(const TaskInfo& _task)
   {
-    CHECK_EQ(REGISTERED, state);
+    CHECK_EQ(SUBSCRIBED, state);
 
     if (launched) {
-      TaskStatus status;
-      status.mutable_task_id()->MergeFrom(task.task_id());
-      status.set_state(TASK_FAILED);
-      status.set_message(
+      update(
+          _task.task_id(),
+          TASK_FAILED,
+          None(),
           "Attempted to run multiple tasks using a \"command\" executor");
-
-      driver->sendStatusUpdate(status);
       return;
     }
 
+    // Capture the task.
+    task = _task;
+
     // Capture the TaskID.
-    taskId = task.task_id();
+    taskId = task->task_id();
 
     // Capture the kill policy.
-    if (task.has_kill_policy()) {
-      killPolicy = task.kill_policy();
+    if (task->has_kill_policy()) {
+      killPolicy = task->kill_policy();
     }
 
     // Determine the command to launch the task.
@@ -175,11 +309,11 @@ public:
       }
 
       command = parse.get();
-    } else if (task.has_command()) {
-      command = task.command();
+    } else if (task->has_command()) {
+      command = task->command();
     } else {
       CHECK_SOME(override)
-        << "Expecting task '" << task.task_id()
+        << "Expecting task '" << task->task_id()
         << "' to have a command!";
     }
 
@@ -191,16 +325,16 @@ public:
       // correct solution is to perform this validation at master side.
       if (command.shell()) {
         CHECK(command.has_value())
-          << "Shell command of task '" << task.task_id()
+          << "Shell command of task '" << task->task_id()
           << "' is not specified!";
       } else {
         CHECK(command.has_value())
-          << "Executable of task '" << task.task_id()
+          << "Executable of task '" << task->task_id()
           << "' is not specified!";
       }
     }
 
-    cout << "Starting task " << task.task_id() << endl;
+    cout << "Starting task " << task->task_id() << endl;
 
     // TODO(benh): Clean this up with the new 'Fork' abstraction.
     // Use pipes to determine which child has successfully changed
@@ -448,25 +582,22 @@ public:
 
     cout << "Forked command at " << pid << endl;
 
-    if (task.has_health_check()) {
-      launchHealthCheck(task);
+    if (task->has_health_check()) {
+      launchHealthCheck(task.get());
     }
 
     // Monitor this process.
     process::reap(pid)
-      .onAny(defer(self(), &Self::reaped, driver, pid, lambda::_1));
+      .onAny(defer(self(), &Self::reaped, pid, lambda::_1));
 
-    TaskStatus status;
-    status.mutable_task_id()->MergeFrom(task.task_id());
-    status.set_state(TASK_RUNNING);
-    driver->sendStatusUpdate(status);
+    update(task->task_id(), TASK_RUNNING);
 
     launched = true;
   }
 
-  void killTask(ExecutorDriver* driver, const TaskID& taskId)
+  void kill(const TaskID& taskId)
   {
-    cout << "Received killTask for task " << taskId.value() << endl;
+    cout << "Received kill for task " << taskId.value() << endl;
 
     // Default grace period is set to 3s for backwards compatibility.
     //
@@ -478,12 +609,10 @@ public:
       gracePeriod = Nanoseconds(killPolicy->grace_period().nanoseconds());
     }
 
-    killTask(driver, taskId, gracePeriod);
+    kill(taskId, gracePeriod);
   }
 
-  void frameworkMessage(ExecutorDriver* driver, const string& data) {}
-
-  void shutdown(ExecutorDriver* driver)
+  void shutdown()
   {
     cout << "Shutting down" << endl;
 
@@ -504,53 +633,12 @@ public:
     // agent is smaller than the kill grace period).
     if (launched) {
       CHECK_SOME(taskId);
-      killTask(driver, taskId.get(), gracePeriod);
-    } else {
-      driver->stop();
-    }
-  }
-
-  virtual void error(ExecutorDriver* driver, const string& message) {}
-
-protected:
-  virtual void initialize()
-  {
-    install<TaskHealthStatus>(
-        &CommandExecutorProcess::taskHealthUpdated,
-        &TaskHealthStatus::task_id,
-        &TaskHealthStatus::healthy,
-        &TaskHealthStatus::kill_task);
-  }
-
-  void taskHealthUpdated(
-      const TaskID& taskID,
-      const bool& healthy,
-      const bool& initiateTaskKill)
-  {
-    if (driver.isNone()) {
-      return;
-    }
-
-    cout << "Received task health update, healthy: "
-         << stringify(healthy) << endl;
-
-    TaskStatus status;
-    status.mutable_task_id()->CopyFrom(taskID);
-    status.set_healthy(healthy);
-    status.set_state(TASK_RUNNING);
-    driver.get()->sendStatusUpdate(status);
-
-    if (initiateTaskKill) {
-      killedByHealthCheck = true;
-      killTask(driver.get(), taskID);
+      kill(taskId.get(), gracePeriod);
     }
   }
 
 private:
-  void killTask(
-      ExecutorDriver* driver,
-      const TaskID& _taskId,
-      const Duration& gracePeriod)
+  void kill(const TaskID& _taskId, const Duration& gracePeriod)
   {
     if (launched && !killed) {
       // Send TASK_KILLING if the framework can handle it.
@@ -561,10 +649,7 @@ private:
       foreach (const FrameworkInfo::Capability& c,
                frameworkInfo->capabilities()) {
         if (c.type() == FrameworkInfo::Capability::TASK_KILLING_STATE) {
-          TaskStatus status;
-          status.mutable_task_id()->CopyFrom(taskId.get());
-          status.set_state(TASK_KILLING);
-          driver->sendStatusUpdate(status);
+          update(taskId.get(), TASK_KILLING);
           break;
         }
       }
@@ -606,10 +691,7 @@ private:
     }
   }
 
-  void reaped(
-      ExecutorDriver* driver,
-      pid_t pid,
-      const Future<Option<int> >& status_)
+  void reaped(pid_t pid, const Future<Option<int> >& status_)
   {
     TaskState taskState;
     string message;
@@ -632,7 +714,7 @@ private:
         taskState = TASK_FINISHED;
       } else if (killed) {
         // Send TASK_KILLED if the task was killed as a result of
-        // killTask() or shutdown().
+        // kill() or shutdown().
         taskState = TASK_KILLED;
       } else {
         taskState = TASK_FAILED;
@@ -645,22 +727,17 @@ private:
 
     CHECK_SOME(taskId);
 
-    TaskStatus taskStatus;
-    taskStatus.mutable_task_id()->MergeFrom(taskId.get());
-    taskStatus.set_state(taskState);
-    taskStatus.set_message(message);
     if (killed && killedByHealthCheck) {
-      taskStatus.set_healthy(false);
+      update(taskId.get(), taskState, false, message);
+    } else {
+      update(taskId.get(), taskState, None(), message);
     }
 
-    driver->sendStatusUpdate(taskStatus);
-
-    // This is a hack to ensure the message is sent to the
-    // slave before we exit the process. Without this, we
-    // may exit before libprocess has sent the data over
-    // the socket. See MESOS-4111.
+    // TODO(qianzhang): Remove this hack since the executor now receives
+    // acknowledgements for status updates. The executor can terminate
+    // after it receives an ACK for a terminal status update.
     os::sleep(Seconds(1));
-    driver->stop();
+    terminate(self());
   }
 
   void escalated(Duration timeout)
@@ -728,10 +805,49 @@ private:
          << stringify(healthPid) << endl;
   }
 
+  void update(
+      const TaskID& taskID,
+      const TaskState& state,
+      const Option<bool>& healthy = None(),
+      const Option<string>& message = None())
+  {
+    UUID uuid = UUID::random();
+
+    TaskStatus status;
+    status.mutable_task_id()->CopyFrom(taskID);
+    status.mutable_executor_id()->CopyFrom(executorId);
+
+    status.set_state(state);
+    status.set_source(TaskStatus::SOURCE_EXECUTOR);
+    status.set_uuid(uuid.toBytes());
+
+    if (healthy.isSome()) {
+      status.set_healthy(healthy.get());
+    }
+
+    if (message.isSome()) {
+      status.set_message(message.get());
+    }
+
+    Call call;
+    call.set_type(Call::UPDATE);
+
+    call.mutable_framework_id()->CopyFrom(frameworkId);
+    call.mutable_executor_id()->CopyFrom(executorId);
+
+    call.mutable_update()->mutable_status()->CopyFrom(status);
+
+    // Capture the status update.
+    updates[uuid] = call.update();
+
+    mesos->send(call);
+  }
+
   enum State
   {
-    REGISTERING, // Executor is launched but not (re-)registered yet.
-    REGISTERED,  // Executor has (re-)registered.
+    CONNECTED,
+    DISCONNECTED,
+    SUBSCRIBED
   } state;
 
   bool launched;
@@ -742,7 +858,6 @@ private:
   Duration shutdownGracePeriod;
   Option<KillPolicy> killPolicy;
   Timer escalationTimer;
-  Option<ExecutorDriver*> driver;
   Option<FrameworkInfo> frameworkInfo;
   Option<TaskID> taskId;
   string healthCheckDir;
@@ -751,99 +866,15 @@ private:
   Option<string> workingDirectory;
   Option<string> user;
   Option<string> taskCommand;
-};
-
-
-class CommandExecutor: public Executor
-{
-public:
-  CommandExecutor(
-      const Option<char**>& override,
-      const string& healthCheckDir,
-      const Option<string>& sandboxDirectory,
-      const Option<string>& workingDirectory,
-      const Option<string>& user,
-      const Option<string>& taskCommand,
-      const Duration& shutdownGracePeriod)
-  {
-    process = new CommandExecutorProcess(
-        override,
-        healthCheckDir,
-        sandboxDirectory,
-        workingDirectory,
-        user,
-        taskCommand,
-        shutdownGracePeriod);
-
-    spawn(process);
-  }
-
-  virtual ~CommandExecutor()
-  {
-    terminate(process);
-    wait(process);
-    delete process;
-  }
-
-  virtual void registered(
-        ExecutorDriver* driver,
-        const ExecutorInfo& executorInfo,
-        const FrameworkInfo& frameworkInfo,
-        const SlaveInfo& slaveInfo)
-  {
-    dispatch(process,
-             &CommandExecutorProcess::registered,
-             driver,
-             executorInfo,
-             frameworkInfo,
-             slaveInfo);
-  }
-
-  virtual void reregistered(
-      ExecutorDriver* driver,
-      const SlaveInfo& slaveInfo)
-  {
-    dispatch(process,
-             &CommandExecutorProcess::reregistered,
-             driver,
-             slaveInfo);
-  }
-
-  virtual void disconnected(ExecutorDriver* driver)
-  {
-    dispatch(process, &CommandExecutorProcess::disconnected, driver);
-  }
-
-  virtual void launchTask(ExecutorDriver* driver, const TaskInfo& task)
-  {
-    dispatch(process, &CommandExecutorProcess::launchTask, driver, task);
-  }
-
-  virtual void killTask(ExecutorDriver* driver, const TaskID& taskId)
-  {
-    dispatch(process, &CommandExecutorProcess::killTask, driver, taskId);
-  }
-
-  virtual void frameworkMessage(ExecutorDriver* driver, const string& data)
-  {
-    dispatch(process, &CommandExecutorProcess::frameworkMessage, driver, data);
-  }
-
-  virtual void shutdown(ExecutorDriver* driver)
-  {
-    dispatch(process, &CommandExecutorProcess::shutdown, driver);
-  }
-
-  virtual void error(ExecutorDriver* driver, const string& data)
-  {
-    dispatch(process, &CommandExecutorProcess::error, driver, data);
-  }
-
-private:
-  CommandExecutorProcess* process;
+  const FrameworkID frameworkId;
+  const ExecutorID executorId;
+  Owned<Mesos> mesos;
+  LinkedHashMap<UUID, Call::Update> updates; // Unacknowledged updates.
+  Option<TaskInfo> task; // Unacknowledged task.
 };
 
 } // namespace internal {
+} // namespace v1 {
 } // namespace mesos {
 
 
@@ -897,6 +928,8 @@ public:
 int main(int argc, char** argv)
 {
   Flags flags;
+  FrameworkID frameworkId;
+  ExecutorID executorId;
 
   // Load flags from command line.
   Try<Nothing> load = flags.load(None(), &argc, &argv);
@@ -929,6 +962,20 @@ int main(int argc, char** argv)
     ? envPath.get()
     : os::realpath(Path(argv[0]).dirname()).get();
 
+  Option<string> value = os::getenv("MESOS_FRAMEWORK_ID");
+  if (value.isNone()) {
+    EXIT(EXIT_FAILURE)
+      << "Expecting 'MESOS_FRAMEWORK_ID' to be set in the environment";
+  }
+  frameworkId.set_value(value.get());
+
+  value = os::getenv("MESOS_EXECUTOR_ID");
+  if (value.isNone()) {
+    EXIT(EXIT_FAILURE)
+      << "Expecting 'MESOS_EXECUTOR_ID' to be set in the environment";
+  }
+  executorId.set_value(value.get());
+
   // Get executor shutdown grace period from the environment.
   //
   // NOTE: We avoided introducing a command executor flag for this
@@ -936,7 +983,7 @@ int main(int argc, char** argv)
   // This makes it difficult to add or remove command executor flags
   // that are unconditionally set by the agent.
   Duration shutdownGracePeriod = DEFAULT_EXECUTOR_SHUTDOWN_GRACE_PERIOD;
-  Option<string> value = os::getenv("MESOS_EXECUTOR_SHUTDOWN_GRACE_PERIOD");
+  value = os::getenv("MESOS_EXECUTOR_SHUTDOWN_GRACE_PERIOD");
   if (value.isSome()) {
     Try<Duration> parse = Duration::parse(value.get());
     if (parse.isError()) {
@@ -948,16 +995,20 @@ int main(int argc, char** argv)
     shutdownGracePeriod = parse.get();
   }
 
-  mesos::internal::CommandExecutor executor(
-      override,
-      path,
-      flags.sandbox_directory,
-      flags.working_directory,
-      flags.user,
-      flags.task_command,
-      shutdownGracePeriod);
-
-  mesos::MesosExecutorDriver driver(&executor);
-
-  return driver.run() == mesos::DRIVER_STOPPED ? EXIT_SUCCESS : EXIT_FAILURE;
+  Owned<mesos::v1::internal::HttpCommandExecutor> executor(
+      new mesos::v1::internal::HttpCommandExecutor(
+          override,
+          path,
+          flags.sandbox_directory,
+          flags.working_directory,
+          flags.user,
+          flags.task_command,
+          frameworkId,
+          executorId,
+          shutdownGracePeriod));
+
+  process::spawn(executor.get());
+  process::wait(executor.get());
+
+  return EXIT_SUCCESS;
 }

Reply via email to