Repository: mesos Updated Branches: refs/heads/master b2d13bc3b -> d0046dca7
Refactored Subprocess to support execve style launch and customized clone function. Review: https://reviews.apache.org/r/22831 Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/1b0fdf01 Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/1b0fdf01 Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/1b0fdf01 Branch: refs/heads/master Commit: 1b0fdf01b47591ebd966eba3e0df360caa40888e Parents: b2d13bc Author: Jie Yu <yujie....@gmail.com> Authored: Fri Jun 20 12:28:02 2014 -0700 Committer: Jie Yu <yujie....@gmail.com> Committed: Wed Jun 25 14:31:29 2014 -0700 ---------------------------------------------------------------------- .../libprocess/include/process/subprocess.hpp | 101 ++++++++-- 3rdparty/libprocess/src/subprocess.cpp | 201 ++++++++++++------- .../libprocess/src/tests/subprocess_tests.cpp | 19 +- 3 files changed, 223 insertions(+), 98 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/1b0fdf01/3rdparty/libprocess/include/process/subprocess.hpp ---------------------------------------------------------------------- diff --git a/3rdparty/libprocess/include/process/subprocess.hpp b/3rdparty/libprocess/include/process/subprocess.hpp index d6e2c1f..7ff5a10 100644 --- a/3rdparty/libprocess/include/process/subprocess.hpp +++ b/3rdparty/libprocess/include/process/subprocess.hpp @@ -7,6 +7,7 @@ #include <map> #include <string> +#include <vector> #include <process/future.hpp> @@ -38,17 +39,25 @@ public: // 3. FD: Redirect to an open file descriptor. class IO { + public: + bool isPipe() const { return mode == PIPE; } + bool isPath() const { return mode == PATH; } + bool isFd() const { return mode == FD; } + private: friend class Subprocess; friend Try<Subprocess> subprocess( - const std::string& command, - const IO& in, - const IO& out, - const IO& err, + const std::string& path, + std::vector<std::string> argv, + const Subprocess::IO& in, + const Subprocess::IO& out, + const Subprocess::IO& err, const Option<flags::FlagsBase>& flags, const Option<std::map<std::string, std::string> >& environment, - const Option<lambda::function<int()> >& setup); + const Option<lambda::function<int()> >& setup, + const Option<lambda::function< + pid_t(const lambda::function<int()>&)> >& clone); enum Mode { @@ -95,13 +104,16 @@ public: private: friend Try<Subprocess> subprocess( - const std::string& command, - const IO& in, - const IO& out, - const IO& err, + const std::string& path, + std::vector<std::string> argv, + const Subprocess::IO& in, + const Subprocess::IO& out, + const Subprocess::IO& err, const Option<flags::FlagsBase>& flags, const Option<std::map<std::string, std::string> >& environment, - const Option<lambda::function<int()> >& setup); + const Option<lambda::function<int()> >& setup, + const Option<lambda::function< + pid_t(const lambda::function<int()>&)> >& clone); struct Data { @@ -141,29 +153,86 @@ private: // must not contain any async unsafe code. // TODO(dhamon): Add an option to not combine the two environments. Try<Subprocess> subprocess( - const std::string& command, + const std::string& path, + std::vector<std::string> argv, const Subprocess::IO& in, const Subprocess::IO& out, const Subprocess::IO& err, const Option<flags::FlagsBase>& flags = None(), const Option<std::map<std::string, std::string> >& environment = None(), - const Option<lambda::function<int()> >& setup = None()); + const Option<lambda::function<int()> >& setup = None(), + const Option<lambda::function< + pid_t(const lambda::function<int()>&)> >& clone = None()); inline Try<Subprocess> subprocess( - const std::string& command, + const std::string& path, + std::vector<std::string> argv, const Option<flags::FlagsBase>& flags = None(), const Option<std::map<std::string, std::string> >& environment = None(), - const Option<lambda::function<int()> >& setup = None()) + const Option<lambda::function<int()> >& setup = None(), + const Option<lambda::function< + pid_t(const lambda::function<int()>&)> >& clone = None()) { return subprocess( - command, + path, + argv, Subprocess::FD(STDIN_FILENO), Subprocess::FD(STDOUT_FILENO), Subprocess::FD(STDERR_FILENO), flags, environment, - setup); + setup, + clone); +} + + +// Overloads for launching a shell command. Currently, we do not +// support flags for shell command variants due to the complexity +// involved in escaping quotes in flags. +inline Try<Subprocess> subprocess( + const std::string& command, + const Subprocess::IO& in, + const Subprocess::IO& out, + const Subprocess::IO& err, + const Option<std::map<std::string, std::string> >& environment = None(), + const Option<lambda::function<int()> >& setup = None(), + const Option<lambda::function< + pid_t(const lambda::function<int()>&)> >& clone = None()) +{ + std::vector<std::string> argv(3); + argv[0] = "sh"; + argv[1] = "-c"; + argv[2] = command; + + return subprocess( + "/bin/sh", + argv, + in, + out, + err, + None(), + environment, + setup, + clone); +} + + +inline Try<Subprocess> subprocess( + const std::string& command, + const Option<std::map<std::string, std::string> >& environment = None(), + const Option<lambda::function<int()> >& setup = None(), + const Option<lambda::function< + pid_t(const lambda::function<int()>&)> >& clone = None()) +{ + return subprocess( + command, + Subprocess::FD(STDIN_FILENO), + Subprocess::FD(STDOUT_FILENO), + Subprocess::FD(STDERR_FILENO), + environment, + setup, + clone); } } // namespace process { http://git-wip-us.apache.org/repos/asf/mesos/blob/1b0fdf01/3rdparty/libprocess/src/subprocess.cpp ---------------------------------------------------------------------- diff --git a/3rdparty/libprocess/src/subprocess.cpp b/3rdparty/libprocess/src/subprocess.cpp index 6ac7b5b..68bfd5d 100644 --- a/3rdparty/libprocess/src/subprocess.cpp +++ b/3rdparty/libprocess/src/subprocess.cpp @@ -24,6 +24,7 @@ using std::map; using std::string; +using std::vector; namespace process { namespace internal { @@ -85,15 +86,99 @@ static Try<Nothing> cloexec(int stdinFd[2], int stdoutFd[2], int stderrFd[2]) } // namespace internal { -// Runs the provided command in a subprocess. +static pid_t defaultClone(const lambda::function<int()>& func) +{ + pid_t pid = ::fork(); + if (pid == -1) { + return -1; + } else if (pid == 0) { + // Child. + ::exit(func()); + return UNREACHABLE(); + } else { + // Parent. + return pid; + } +} + + +// The main entry of the child process. Note that this function has to +// be async singal safe. +static int childMain( + const string& path, + char** argv, + const Subprocess::IO& in, + const Subprocess::IO& out, + const Subprocess::IO& err, + os::ExecEnv* envp, + const Option<lambda::function<int()> >& setup, + int stdinFd[2], + int stdoutFd[2], + int stderrFd[2]) +{ + // Close parent's end of the pipes. + if (in.isPipe()) { + while (::close(stdinFd[1]) == -1 && errno == EINTR); + } + if (out.isPipe()) { + while (::close(stdoutFd[0]) == -1 && errno == EINTR); + } + if (err.isPipe()) { + while (::close(stderrFd[0]) == -1 && errno == EINTR); + } + + // Redirect I/O for stdin/stdout/stderr. + while (::dup2(stdinFd[0], STDIN_FILENO) == -1 && errno == EINTR); + while (::dup2(stdoutFd[1], STDOUT_FILENO) == -1 && errno == EINTR); + while (::dup2(stderrFd[1], STDERR_FILENO) == -1 && errno == EINTR); + + // Close the copies. We need to make sure that we do not close the + // file descriptor assigned to stdin/stdout/stderr in case the + // parent has closed stdin/stdout/stderr when calling this + // function (in that case, a dup'ed file descriptor may have the + // same file descriptor number as stdin/stdout/stderr). + if (stdinFd[0] != STDIN_FILENO && + stdinFd[0] != STDOUT_FILENO && + stdinFd[0] != STDERR_FILENO) { + while (::close(stdinFd[0]) == -1 && errno == EINTR); + } + if (stdoutFd[1] != STDIN_FILENO && + stdoutFd[1] != STDOUT_FILENO && + stdoutFd[1] != STDERR_FILENO) { + while (::close(stdoutFd[1]) == -1 && errno == EINTR); + } + if (stderrFd[1] != STDIN_FILENO && + stderrFd[1] != STDOUT_FILENO && + stderrFd[1] != STDERR_FILENO) { + while (::close(stderrFd[1]) == -1 && errno == EINTR); + } + + if (setup.isSome()) { + int status = setup.get()(); + if (status != 0) { + _exit(status); + } + } + + execve(path.c_str(), argv, (*envp)()); + + ABORT("Failed to execve in childMain\n"); + + return UNREACHABLE(); +} + + Try<Subprocess> subprocess( - const string& _command, + const string& path, + vector<string> argv, const Subprocess::IO& in, const Subprocess::IO& out, const Subprocess::IO& err, const Option<flags::FlagsBase>& flags, const Option<map<string, string> >& environment, - const Option<lambda::function<int()> >& setup) + const Option<lambda::function<int()> >& setup, + const Option<lambda::function< + pid_t(const lambda::function<int()>&)> >& _clone) { // File descriptors for redirecting stdin/stdout/stderr. These file // descriptors are used for different purposes depending on the @@ -116,7 +201,7 @@ Try<Subprocess> subprocess( break; } case Subprocess::IO::PIPE: { - if (pipe(stdinFd) == -1) { + if (::pipe(stdinFd) == -1) { return ErrnoError("Failed to create pipe"); } break; @@ -147,7 +232,7 @@ Try<Subprocess> subprocess( break; } case Subprocess::IO::PIPE: { - if (pipe(stdoutFd) == -1) { + if (::pipe(stdoutFd) == -1) { // Save the errno as 'close' below might overwrite it. ErrnoError error("Failed to create pipe"); internal::close(stdinFd, stdoutFd, stderrFd); @@ -186,7 +271,7 @@ Try<Subprocess> subprocess( break; } case Subprocess::IO::PIPE: { - if (pipe(stderrFd) == -1) { + if (::pipe(stderrFd) == -1) { // Save the errno as 'close' below might overwrite it. ErrnoError error("Failed to create pipe"); internal::close(stdinFd, stdoutFd, stderrFd); @@ -219,95 +304,65 @@ Try<Subprocess> subprocess( return Error("Failed to cloexec: " + cloexec.error()); } - // Prepare the command to execute. If the user specifies the - // 'flags', we will stringify it and append it to the command. - string command = _command; - + // Prepare the arguments. If the user specifies the 'flags', we will + // stringify them and append them to the existing arguments. if (flags.isSome()) { foreachpair (const string& name, const flags::Flag& flag, flags.get()) { Option<string> value = flag.stringify(flags.get()); if (value.isSome()) { - // TODO(jieyu): Need a better way to escape quotes. For - // example, what if 'value.get()' contains a single quote? - string argument = "--" + name + "='" + value.get() + "'"; - command = strings::join(" ", command, argument); + argv.push_back("--" + name + "=" + value.get()); } } } - // We need to do this construction before doing the fork as it + // The real arguments that will be passed to 'execve'. We need to + // construct them here before doing the clone as it might not be + // async signal safe. + char** _argv = new char*[argv.size() + 1]; + for (int i = 0; i < argv.size(); i++) { + _argv[i] = (char*) argv[i].c_str(); + } + _argv[argv.size()] = NULL; + + // We need to do this construction before doing the clone as it // might not be async-safe. // TODO(tillt): Consider optimizing this to not pass an empty map // into the constructor or even further to use execl instead of // execle once we have no user supplied environment. os::ExecEnv envp(environment.get(map<string, string>())); - pid_t pid; - if ((pid = fork()) == -1) { + // Determine the function to clone the child process. If the user + // does not specify the clone function, we will use the default. + lambda::function<pid_t(const lambda::function<int()>&)> clone = + (_clone.isSome() ? _clone.get() : defaultClone); + + // Now, clone the child process. + pid_t pid = clone(lambda::bind( + &childMain, + path, + _argv, + in, + out, + err, + &envp, + setup, + stdinFd, + stdoutFd, + stderrFd)); + + delete[] _argv; + + if (pid == -1) { // Save the errno as 'close' below might overwrite it. - ErrnoError error("Failed to fork"); + ErrnoError error("Failed to clone"); internal::close(stdinFd, stdoutFd, stderrFd); return error; } + // Parent. Subprocess process; process.data->pid = pid; - if (process.data->pid == 0) { - // Child. - // Close parent's end of the pipes. - if (in.mode == Subprocess::IO::PIPE) { - while (::close(stdinFd[1]) == -1 && errno == EINTR); - } - if (out.mode == Subprocess::IO::PIPE) { - while (::close(stdoutFd[0]) == -1 && errno == EINTR); - } - if (err.mode == Subprocess::IO::PIPE) { - while (::close(stderrFd[0]) == -1 && errno == EINTR); - } - - // Redirect I/O for stdin/stdout/stderr. - while (::dup2(stdinFd[0], STDIN_FILENO) == -1 && errno == EINTR); - while (::dup2(stdoutFd[1], STDOUT_FILENO) == -1 && errno == EINTR); - while (::dup2(stderrFd[1], STDERR_FILENO) == -1 && errno == EINTR); - - // Close the copies. We need to make sure that we do not close the - // file descriptor assigned to stdin/stdout/stderr in case the - // parent has closed stdin/stdout/stderr when calling this - // function (in that case, a dup'ed file descriptor may have the - // same file descriptor number as stdin/stdout/stderr). - if (stdinFd[0] != STDIN_FILENO && - stdinFd[0] != STDOUT_FILENO && - stdinFd[0] != STDERR_FILENO) { - while (::close(stdinFd[0]) == -1 && errno == EINTR); - } - if (stdoutFd[1] != STDIN_FILENO && - stdoutFd[1] != STDOUT_FILENO && - stdoutFd[1] != STDERR_FILENO) { - while (::close(stdoutFd[1]) == -1 && errno == EINTR); - } - if (stderrFd[1] != STDIN_FILENO && - stderrFd[1] != STDOUT_FILENO && - stderrFd[1] != STDERR_FILENO) { - while (::close(stderrFd[1]) == -1 && errno == EINTR); - } - - if (setup.isSome()) { - int status = setup.get()(); - if (status != 0) { - _exit(status); - } - } - - // TODO(jieyu): Consider providing an optional way to launch the - // subprocess without using the shell (similar to 'shell=False' - // used in python subprocess.Popen). - execle("/bin/sh", "sh", "-c", command.c_str(), (char*) NULL, envp()); - - ABORT("Failed to execle '/bin/sh -c ", command.c_str(), "'\n"); - } - - // Parent. // Close the file descriptors that are created by this function. For // pipes, we close the child ends and store the parent ends (see the // code below). http://git-wip-us.apache.org/repos/asf/mesos/blob/1b0fdf01/3rdparty/libprocess/src/tests/subprocess_tests.cpp ---------------------------------------------------------------------- diff --git a/3rdparty/libprocess/src/tests/subprocess_tests.cpp b/3rdparty/libprocess/src/tests/subprocess_tests.cpp index 7dfa384..98a4e44 100644 --- a/3rdparty/libprocess/src/tests/subprocess_tests.cpp +++ b/3rdparty/libprocess/src/tests/subprocess_tests.cpp @@ -509,6 +509,8 @@ struct Flags : public flags::FlagsBase add(&b, "b", "bool"); add(&i, "i", "int"); add(&s, "s", "string"); + add(&s2, "s2", "string with single quote"); + add(&s3, "s3", "string with double quote"); add(&d, "d", "Duration"); add(&y, "y", "Bytes"); add(&j, "j", "JSON::Object"); @@ -517,6 +519,8 @@ struct Flags : public flags::FlagsBase Option<bool> b; Option<int> i; Option<string> s; + Option<string> s2; + Option<string> s3; Option<Duration> d; Option<Bytes> y; Option<JSON::Object> j; @@ -531,6 +535,8 @@ TEST_F(SubprocessTest, Flags) flags.b = true; flags.i = 42; flags.s = "hello"; + flags.s2 = "we're"; + flags.s3 = "\"geek\""; flags.d = Seconds(10); flags.y = Bytes(100); @@ -555,7 +561,8 @@ TEST_F(SubprocessTest, Flags) string out = path::join(os::getcwd(), "stdout"); Try<Subprocess> s = subprocess( - "echo", + "/bin/echo", + vector<string>(1, "echo"), Subprocess::PIPE(), Subprocess::PATH(out), Subprocess::PIPE(), @@ -597,6 +604,8 @@ TEST_F(SubprocessTest, Flags) EXPECT_EQ(flags.b, flags2.b); EXPECT_EQ(flags.i, flags2.i); EXPECT_EQ(flags.s, flags2.s); + EXPECT_EQ(flags.s2, flags2.s2); + EXPECT_EQ(flags.s3, flags2.s3); EXPECT_EQ(flags.d, flags2.d); EXPECT_EQ(flags.y, flags2.y); EXPECT_EQ(flags.j, flags2.j); @@ -623,7 +632,6 @@ TEST_F(SubprocessTest, Environment) Subprocess::PIPE(), Subprocess::PIPE(), Subprocess::PIPE(), - None(), environment); ASSERT_SOME(s); @@ -654,7 +662,6 @@ TEST_F(SubprocessTest, Environment) Subprocess::PIPE(), Subprocess::PIPE(), Subprocess::PIPE(), - None(), environment); ASSERT_SOME(s); @@ -692,7 +699,6 @@ TEST_F(SubprocessTest, EnvironmentWithSpaces) Subprocess::PIPE(), Subprocess::PIPE(), Subprocess::PIPE(), - None(), environment); ASSERT_SOME(s); @@ -730,7 +736,6 @@ TEST_F(SubprocessTest, EnvironmentWithSpacesAndQuotes) Subprocess::PIPE(), Subprocess::PIPE(), Subprocess::PIPE(), - None(), environment); ASSERT_SOME(s); @@ -770,7 +775,6 @@ TEST_F(SubprocessTest, EnvironmentOverride) Subprocess::PIPE(), Subprocess::PIPE(), Subprocess::PIPE(), - None(), environment); ASSERT_SOME(s); @@ -820,7 +824,6 @@ TEST_F(SubprocessTest, Setup) Subprocess::PIPE(), Subprocess::PIPE(), None(), - None(), lambda::bind(&setupChdir, directory.get())); ASSERT_SOME(s); @@ -862,7 +865,6 @@ TEST_F(SubprocessTest, SetupStatus) Subprocess::PIPE(), Subprocess::PIPE(), None(), - None(), lambda::bind(&setupStatus, 1)); ASSERT_SOME(s); @@ -889,7 +891,6 @@ TEST_F(SubprocessTest, SetupStatus) Subprocess::PIPE(), Subprocess::PIPE(), None(), - None(), lambda::bind(&setupStatus, 0)); ASSERT_SOME(s);