Repository: mesos
Updated Branches:
  refs/heads/master b2d13bc3b -> d0046dca7


Refactored Subprocess to support execve style launch and customized
clone function.

Review: https://reviews.apache.org/r/22831


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/1b0fdf01
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/1b0fdf01
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/1b0fdf01

Branch: refs/heads/master
Commit: 1b0fdf01b47591ebd966eba3e0df360caa40888e
Parents: b2d13bc
Author: Jie Yu <yujie....@gmail.com>
Authored: Fri Jun 20 12:28:02 2014 -0700
Committer: Jie Yu <yujie....@gmail.com>
Committed: Wed Jun 25 14:31:29 2014 -0700

----------------------------------------------------------------------
 .../libprocess/include/process/subprocess.hpp   | 101 ++++++++--
 3rdparty/libprocess/src/subprocess.cpp          | 201 ++++++++++++-------
 .../libprocess/src/tests/subprocess_tests.cpp   |  19 +-
 3 files changed, 223 insertions(+), 98 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/1b0fdf01/3rdparty/libprocess/include/process/subprocess.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/subprocess.hpp 
b/3rdparty/libprocess/include/process/subprocess.hpp
index d6e2c1f..7ff5a10 100644
--- a/3rdparty/libprocess/include/process/subprocess.hpp
+++ b/3rdparty/libprocess/include/process/subprocess.hpp
@@ -7,6 +7,7 @@
 
 #include <map>
 #include <string>
+#include <vector>
 
 #include <process/future.hpp>
 
@@ -38,17 +39,25 @@ public:
   //   3. FD: Redirect to an open file descriptor.
   class IO
   {
+  public:
+    bool isPipe() const { return mode == PIPE; }
+    bool isPath() const { return mode == PATH; }
+    bool isFd() const { return mode == FD; }
+
   private:
     friend class Subprocess;
 
     friend Try<Subprocess> subprocess(
-        const std::string& command,
-        const IO& in,
-        const IO& out,
-        const IO& err,
+        const std::string& path,
+        std::vector<std::string> argv,
+        const Subprocess::IO& in,
+        const Subprocess::IO& out,
+        const Subprocess::IO& err,
         const Option<flags::FlagsBase>& flags,
         const Option<std::map<std::string, std::string> >& environment,
-        const Option<lambda::function<int()> >& setup);
+        const Option<lambda::function<int()> >& setup,
+        const Option<lambda::function<
+            pid_t(const lambda::function<int()>&)> >& clone);
 
     enum Mode
     {
@@ -95,13 +104,16 @@ public:
 
 private:
   friend Try<Subprocess> subprocess(
-      const std::string& command,
-      const IO& in,
-      const IO& out,
-      const IO& err,
+      const std::string& path,
+      std::vector<std::string> argv,
+      const Subprocess::IO& in,
+      const Subprocess::IO& out,
+      const Subprocess::IO& err,
       const Option<flags::FlagsBase>& flags,
       const Option<std::map<std::string, std::string> >& environment,
-      const Option<lambda::function<int()> >& setup);
+      const Option<lambda::function<int()> >& setup,
+      const Option<lambda::function<
+          pid_t(const lambda::function<int()>&)> >& clone);
 
   struct Data
   {
@@ -141,29 +153,86 @@ private:
 // must not contain any async unsafe code.
 // TODO(dhamon): Add an option to not combine the two environments.
 Try<Subprocess> subprocess(
-    const std::string& command,
+    const std::string& path,
+    std::vector<std::string> argv,
     const Subprocess::IO& in,
     const Subprocess::IO& out,
     const Subprocess::IO& err,
     const Option<flags::FlagsBase>& flags = None(),
     const Option<std::map<std::string, std::string> >& environment = None(),
-    const Option<lambda::function<int()> >& setup = None());
+    const Option<lambda::function<int()> >& setup = None(),
+    const Option<lambda::function<
+        pid_t(const lambda::function<int()>&)> >& clone = None());
 
 
 inline Try<Subprocess> subprocess(
-    const std::string& command,
+    const std::string& path,
+    std::vector<std::string> argv,
     const Option<flags::FlagsBase>& flags = None(),
     const Option<std::map<std::string, std::string> >& environment = None(),
-    const Option<lambda::function<int()> >& setup = None())
+    const Option<lambda::function<int()> >& setup = None(),
+    const Option<lambda::function<
+        pid_t(const lambda::function<int()>&)> >& clone = None())
 {
   return subprocess(
-      command,
+      path,
+      argv,
       Subprocess::FD(STDIN_FILENO),
       Subprocess::FD(STDOUT_FILENO),
       Subprocess::FD(STDERR_FILENO),
       flags,
       environment,
-      setup);
+      setup,
+      clone);
+}
+
+
+// Overloads for launching a shell command. Currently, we do not
+// support flags for shell command variants due to the complexity
+// involved in escaping quotes in flags.
+inline Try<Subprocess> subprocess(
+    const std::string& command,
+    const Subprocess::IO& in,
+    const Subprocess::IO& out,
+    const Subprocess::IO& err,
+    const Option<std::map<std::string, std::string> >& environment = None(),
+    const Option<lambda::function<int()> >& setup = None(),
+    const Option<lambda::function<
+        pid_t(const lambda::function<int()>&)> >& clone = None())
+{
+  std::vector<std::string> argv(3);
+  argv[0] = "sh";
+  argv[1] = "-c";
+  argv[2] = command;
+
+  return subprocess(
+      "/bin/sh",
+      argv,
+      in,
+      out,
+      err,
+      None(),
+      environment,
+      setup,
+      clone);
+}
+
+
+inline Try<Subprocess> subprocess(
+    const std::string& command,
+    const Option<std::map<std::string, std::string> >& environment = None(),
+    const Option<lambda::function<int()> >& setup = None(),
+    const Option<lambda::function<
+        pid_t(const lambda::function<int()>&)> >& clone = None())
+{
+  return subprocess(
+      command,
+      Subprocess::FD(STDIN_FILENO),
+      Subprocess::FD(STDOUT_FILENO),
+      Subprocess::FD(STDERR_FILENO),
+      environment,
+      setup,
+      clone);
 }
 
 } // namespace process {

http://git-wip-us.apache.org/repos/asf/mesos/blob/1b0fdf01/3rdparty/libprocess/src/subprocess.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/subprocess.cpp 
b/3rdparty/libprocess/src/subprocess.cpp
index 6ac7b5b..68bfd5d 100644
--- a/3rdparty/libprocess/src/subprocess.cpp
+++ b/3rdparty/libprocess/src/subprocess.cpp
@@ -24,6 +24,7 @@
 
 using std::map;
 using std::string;
+using std::vector;
 
 namespace process {
 namespace internal {
@@ -85,15 +86,99 @@ static Try<Nothing> cloexec(int stdinFd[2], int 
stdoutFd[2], int stderrFd[2])
 }  // namespace internal {
 
 
-// Runs the provided command in a subprocess.
+static pid_t defaultClone(const lambda::function<int()>& func)
+{
+  pid_t pid = ::fork();
+  if (pid == -1) {
+    return -1;
+  } else if (pid == 0) {
+    // Child.
+    ::exit(func());
+    return UNREACHABLE();
+  } else {
+    // Parent.
+    return pid;
+  }
+}
+
+
+// The main entry of the child process. Note that this function has to
+// be async singal safe.
+static int childMain(
+    const string& path,
+    char** argv,
+    const Subprocess::IO& in,
+    const Subprocess::IO& out,
+    const Subprocess::IO& err,
+    os::ExecEnv* envp,
+    const Option<lambda::function<int()> >& setup,
+    int stdinFd[2],
+    int stdoutFd[2],
+    int stderrFd[2])
+{
+  // Close parent's end of the pipes.
+  if (in.isPipe()) {
+    while (::close(stdinFd[1]) == -1 && errno == EINTR);
+  }
+  if (out.isPipe()) {
+    while (::close(stdoutFd[0]) == -1 && errno == EINTR);
+  }
+  if (err.isPipe()) {
+    while (::close(stderrFd[0]) == -1 && errno == EINTR);
+  }
+
+  // Redirect I/O for stdin/stdout/stderr.
+  while (::dup2(stdinFd[0], STDIN_FILENO) == -1 && errno == EINTR);
+  while (::dup2(stdoutFd[1], STDOUT_FILENO) == -1 && errno == EINTR);
+  while (::dup2(stderrFd[1], STDERR_FILENO) == -1 && errno == EINTR);
+
+  // Close the copies. We need to make sure that we do not close the
+  // file descriptor assigned to stdin/stdout/stderr in case the
+  // parent has closed stdin/stdout/stderr when calling this
+  // function (in that case, a dup'ed file descriptor may have the
+  // same file descriptor number as stdin/stdout/stderr).
+  if (stdinFd[0] != STDIN_FILENO &&
+      stdinFd[0] != STDOUT_FILENO &&
+      stdinFd[0] != STDERR_FILENO) {
+    while (::close(stdinFd[0]) == -1 && errno == EINTR);
+  }
+  if (stdoutFd[1] != STDIN_FILENO &&
+      stdoutFd[1] != STDOUT_FILENO &&
+      stdoutFd[1] != STDERR_FILENO) {
+    while (::close(stdoutFd[1]) == -1 && errno == EINTR);
+  }
+  if (stderrFd[1] != STDIN_FILENO &&
+      stderrFd[1] != STDOUT_FILENO &&
+      stderrFd[1] != STDERR_FILENO) {
+    while (::close(stderrFd[1]) == -1 && errno == EINTR);
+  }
+
+  if (setup.isSome()) {
+    int status = setup.get()();
+    if (status != 0) {
+      _exit(status);
+    }
+  }
+
+  execve(path.c_str(), argv, (*envp)());
+
+  ABORT("Failed to execve in childMain\n");
+
+  return UNREACHABLE();
+}
+
+
 Try<Subprocess> subprocess(
-    const string& _command,
+    const string& path,
+    vector<string> argv,
     const Subprocess::IO& in,
     const Subprocess::IO& out,
     const Subprocess::IO& err,
     const Option<flags::FlagsBase>& flags,
     const Option<map<string, string> >& environment,
-    const Option<lambda::function<int()> >& setup)
+    const Option<lambda::function<int()> >& setup,
+    const Option<lambda::function<
+        pid_t(const lambda::function<int()>&)> >& _clone)
 {
   // File descriptors for redirecting stdin/stdout/stderr. These file
   // descriptors are used for different purposes depending on the
@@ -116,7 +201,7 @@ Try<Subprocess> subprocess(
       break;
     }
     case Subprocess::IO::PIPE: {
-      if (pipe(stdinFd) == -1) {
+      if (::pipe(stdinFd) == -1) {
         return ErrnoError("Failed to create pipe");
       }
       break;
@@ -147,7 +232,7 @@ Try<Subprocess> subprocess(
       break;
     }
     case Subprocess::IO::PIPE: {
-      if (pipe(stdoutFd) == -1) {
+      if (::pipe(stdoutFd) == -1) {
         // Save the errno as 'close' below might overwrite it.
         ErrnoError error("Failed to create pipe");
         internal::close(stdinFd, stdoutFd, stderrFd);
@@ -186,7 +271,7 @@ Try<Subprocess> subprocess(
       break;
     }
     case Subprocess::IO::PIPE: {
-      if (pipe(stderrFd) == -1) {
+      if (::pipe(stderrFd) == -1) {
         // Save the errno as 'close' below might overwrite it.
         ErrnoError error("Failed to create pipe");
         internal::close(stdinFd, stdoutFd, stderrFd);
@@ -219,95 +304,65 @@ Try<Subprocess> subprocess(
     return Error("Failed to cloexec: " + cloexec.error());
   }
 
-  // Prepare the command to execute. If the user specifies the
-  // 'flags', we will stringify it and append it to the command.
-  string command = _command;
-
+  // Prepare the arguments. If the user specifies the 'flags', we will
+  // stringify them and append them to the existing arguments.
   if (flags.isSome()) {
     foreachpair (const string& name, const flags::Flag& flag, flags.get()) {
       Option<string> value = flag.stringify(flags.get());
       if (value.isSome()) {
-        // TODO(jieyu): Need a better way to escape quotes. For
-        // example, what if 'value.get()' contains a single quote?
-        string argument = "--" + name + "='" + value.get() + "'";
-        command = strings::join(" ", command, argument);
+        argv.push_back("--" + name + "=" + value.get());
       }
     }
   }
 
-  // We need to do this construction before doing the fork as it
+  // The real arguments that will be passed to 'execve'. We need to
+  // construct them here before doing the clone as it might not be
+  // async signal safe.
+  char** _argv = new char*[argv.size() + 1];
+  for (int i = 0; i < argv.size(); i++) {
+    _argv[i] = (char*) argv[i].c_str();
+  }
+  _argv[argv.size()] = NULL;
+
+  // We need to do this construction before doing the clone as it
   // might not be async-safe.
   // TODO(tillt): Consider optimizing this to not pass an empty map
   // into the constructor or even further to use execl instead of
   // execle once we have no user supplied environment.
   os::ExecEnv envp(environment.get(map<string, string>()));
 
-  pid_t pid;
-  if ((pid = fork()) == -1) {
+  // Determine the function to clone the child process. If the user
+  // does not specify the clone function, we will use the default.
+  lambda::function<pid_t(const lambda::function<int()>&)> clone =
+    (_clone.isSome() ? _clone.get() : defaultClone);
+
+  // Now, clone the child process.
+  pid_t pid = clone(lambda::bind(
+      &childMain,
+      path,
+      _argv,
+      in,
+      out,
+      err,
+      &envp,
+      setup,
+      stdinFd,
+      stdoutFd,
+      stderrFd));
+
+  delete[] _argv;
+
+  if (pid == -1) {
     // Save the errno as 'close' below might overwrite it.
-    ErrnoError error("Failed to fork");
+    ErrnoError error("Failed to clone");
     internal::close(stdinFd, stdoutFd, stderrFd);
     return error;
   }
 
+  // Parent.
   Subprocess process;
   process.data->pid = pid;
 
-  if (process.data->pid == 0) {
-    // Child.
-    // Close parent's end of the pipes.
-    if (in.mode == Subprocess::IO::PIPE) {
-      while (::close(stdinFd[1]) == -1 && errno == EINTR);
-    }
-    if (out.mode == Subprocess::IO::PIPE) {
-      while (::close(stdoutFd[0]) == -1 && errno == EINTR);
-    }
-    if (err.mode == Subprocess::IO::PIPE) {
-      while (::close(stderrFd[0]) == -1 && errno == EINTR);
-    }
-
-    // Redirect I/O for stdin/stdout/stderr.
-    while (::dup2(stdinFd[0], STDIN_FILENO) == -1 && errno == EINTR);
-    while (::dup2(stdoutFd[1], STDOUT_FILENO) == -1 && errno == EINTR);
-    while (::dup2(stderrFd[1], STDERR_FILENO) == -1 && errno == EINTR);
-
-    // Close the copies. We need to make sure that we do not close the
-    // file descriptor assigned to stdin/stdout/stderr in case the
-    // parent has closed stdin/stdout/stderr when calling this
-    // function (in that case, a dup'ed file descriptor may have the
-    // same file descriptor number as stdin/stdout/stderr).
-    if (stdinFd[0] != STDIN_FILENO &&
-        stdinFd[0] != STDOUT_FILENO &&
-        stdinFd[0] != STDERR_FILENO) {
-      while (::close(stdinFd[0]) == -1 && errno == EINTR);
-    }
-    if (stdoutFd[1] != STDIN_FILENO &&
-        stdoutFd[1] != STDOUT_FILENO &&
-        stdoutFd[1] != STDERR_FILENO) {
-      while (::close(stdoutFd[1]) == -1 && errno == EINTR);
-    }
-    if (stderrFd[1] != STDIN_FILENO &&
-        stderrFd[1] != STDOUT_FILENO &&
-        stderrFd[1] != STDERR_FILENO) {
-      while (::close(stderrFd[1]) == -1 && errno == EINTR);
-    }
-
-    if (setup.isSome()) {
-      int status = setup.get()();
-      if (status != 0) {
-        _exit(status);
-      }
-    }
-
-    // TODO(jieyu): Consider providing an optional way to launch the
-    // subprocess without using the shell (similar to 'shell=False'
-    // used in python subprocess.Popen).
-    execle("/bin/sh", "sh", "-c", command.c_str(), (char*) NULL, envp());
-
-    ABORT("Failed to execle '/bin/sh -c ", command.c_str(), "'\n");
-  }
-
-  // Parent.
   // Close the file descriptors that are created by this function. For
   // pipes, we close the child ends and store the parent ends (see the
   // code below).

http://git-wip-us.apache.org/repos/asf/mesos/blob/1b0fdf01/3rdparty/libprocess/src/tests/subprocess_tests.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/tests/subprocess_tests.cpp 
b/3rdparty/libprocess/src/tests/subprocess_tests.cpp
index 7dfa384..98a4e44 100644
--- a/3rdparty/libprocess/src/tests/subprocess_tests.cpp
+++ b/3rdparty/libprocess/src/tests/subprocess_tests.cpp
@@ -509,6 +509,8 @@ struct Flags : public flags::FlagsBase
     add(&b, "b", "bool");
     add(&i, "i", "int");
     add(&s, "s", "string");
+    add(&s2, "s2", "string with single quote");
+    add(&s3, "s3", "string with double quote");
     add(&d, "d", "Duration");
     add(&y, "y", "Bytes");
     add(&j, "j", "JSON::Object");
@@ -517,6 +519,8 @@ struct Flags : public flags::FlagsBase
   Option<bool> b;
   Option<int> i;
   Option<string> s;
+  Option<string> s2;
+  Option<string> s3;
   Option<Duration> d;
   Option<Bytes> y;
   Option<JSON::Object> j;
@@ -531,6 +535,8 @@ TEST_F(SubprocessTest, Flags)
   flags.b = true;
   flags.i = 42;
   flags.s = "hello";
+  flags.s2 = "we're";
+  flags.s3 = "\"geek\"";
   flags.d = Seconds(10);
   flags.y = Bytes(100);
 
@@ -555,7 +561,8 @@ TEST_F(SubprocessTest, Flags)
   string out = path::join(os::getcwd(), "stdout");
 
   Try<Subprocess> s = subprocess(
-      "echo",
+      "/bin/echo",
+      vector<string>(1, "echo"),
       Subprocess::PIPE(),
       Subprocess::PATH(out),
       Subprocess::PIPE(),
@@ -597,6 +604,8 @@ TEST_F(SubprocessTest, Flags)
   EXPECT_EQ(flags.b, flags2.b);
   EXPECT_EQ(flags.i, flags2.i);
   EXPECT_EQ(flags.s, flags2.s);
+  EXPECT_EQ(flags.s2, flags2.s2);
+  EXPECT_EQ(flags.s3, flags2.s3);
   EXPECT_EQ(flags.d, flags2.d);
   EXPECT_EQ(flags.y, flags2.y);
   EXPECT_EQ(flags.j, flags2.j);
@@ -623,7 +632,6 @@ TEST_F(SubprocessTest, Environment)
       Subprocess::PIPE(),
       Subprocess::PIPE(),
       Subprocess::PIPE(),
-      None(),
       environment);
 
   ASSERT_SOME(s);
@@ -654,7 +662,6 @@ TEST_F(SubprocessTest, Environment)
       Subprocess::PIPE(),
       Subprocess::PIPE(),
       Subprocess::PIPE(),
-      None(),
       environment);
 
   ASSERT_SOME(s);
@@ -692,7 +699,6 @@ TEST_F(SubprocessTest, EnvironmentWithSpaces)
       Subprocess::PIPE(),
       Subprocess::PIPE(),
       Subprocess::PIPE(),
-      None(),
       environment);
 
   ASSERT_SOME(s);
@@ -730,7 +736,6 @@ TEST_F(SubprocessTest, EnvironmentWithSpacesAndQuotes)
       Subprocess::PIPE(),
       Subprocess::PIPE(),
       Subprocess::PIPE(),
-      None(),
       environment);
 
   ASSERT_SOME(s);
@@ -770,7 +775,6 @@ TEST_F(SubprocessTest, EnvironmentOverride)
       Subprocess::PIPE(),
       Subprocess::PIPE(),
       Subprocess::PIPE(),
-      None(),
       environment);
 
   ASSERT_SOME(s);
@@ -820,7 +824,6 @@ TEST_F(SubprocessTest, Setup)
       Subprocess::PIPE(),
       Subprocess::PIPE(),
       None(),
-      None(),
       lambda::bind(&setupChdir, directory.get()));
 
   ASSERT_SOME(s);
@@ -862,7 +865,6 @@ TEST_F(SubprocessTest, SetupStatus)
       Subprocess::PIPE(),
       Subprocess::PIPE(),
       None(),
-      None(),
       lambda::bind(&setupStatus, 1));
 
   ASSERT_SOME(s);
@@ -889,7 +891,6 @@ TEST_F(SubprocessTest, SetupStatus)
       Subprocess::PIPE(),
       Subprocess::PIPE(),
       None(),
-      None(),
       lambda::bind(&setupStatus, 0));
 
   ASSERT_SOME(s);

Reply via email to