This is an automated email from the ASF dual-hosted git repository. jieyu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/mesos.git
The following commit(s) were added to refs/heads/master by this push: new fd76294 Call any function in a specified namespace. fd76294 is described below commit fd762943c928be810ba4c2c0aab9e9a6ebc399e8 Author: Sergey Urbanovich <sergey.urbanov...@gmail.com> AuthorDate: Mon Aug 13 16:20:27 2018 -0700 Call any function in a specified namespace. The NamespaceRunner runs any function in a specified namespace. To do that it manages a separate thread which would be re-associated with that namespace. Review: https://reviews.apache.org/r/68053/ --- src/linux/ns.hpp | 143 +++++++++++++++++++++++++++++++++++ src/tests/containerizer/ns_tests.cpp | 24 ++++++ 2 files changed, 167 insertions(+) diff --git a/src/linux/ns.hpp b/src/linux/ns.hpp index 0b4136b..bb40038 100644 --- a/src/linux/ns.hpp +++ b/src/linux/ns.hpp @@ -26,11 +26,16 @@ #include <sys/syscall.h> +#include <queue> #include <set> #include <string> +#include <thread> + +#include <process/future.hpp> #include <stout/lambda.hpp> #include <stout/nothing.hpp> +#include <stout/option.hpp> #include <stout/result.hpp> #include <stout/try.hpp> @@ -178,6 +183,144 @@ Try<pid_t> clone( // flags, e.g., CLONE_NEWNS | CLONE_NEWNET. std::string stringify(int flags); + +// The NamespaceRunner runs any function in a specified namespace. +// To do that it manages a separate thread which would be re-associated +// with that namespace. +class NamespaceRunner +{ +public: + NamespaceRunner() + { + // Start the looper thread. + thread.reset(new std::thread(&NamespaceRunner::loop, this)); + } + + ~NamespaceRunner() + { + // Shutdown the queue. + queue.shutdown(); + // Wait for the thread to complete. + thread->join(); + thread.reset(); + } + + // Run any function in a specified namespace. + template <typename T> + process::Future<T> run( + const std::string& path, + const std::string& ns, + const lambda::function<Try<T>()>& func) + { + std::shared_ptr<process::Promise<T>> promise( + new process::Promise<T>); + process::Future<T> future = promise->future(); + + // Put a function to the queue, the function will be called + // in the thread. The thread will be re-associated with the + // specified namespace. + queue.put([=]{ + Try<Nothing> setns = ::ns::setns(path, ns, false); + if (setns.isError()) { + promise->fail(setns.error()); + } else { + promise->set(func()); + } + }); + + return future; + } + +private: + typedef lambda::function<void()> Func; + + // The thread loop. + void loop() + { + for (;;) { + // Get a function from the queue. + Option<Func> func = queue.get(); + + // Stop the thread if the queue is shutdowned. + if (func.isNone()) { + break; + } + + // Call the function, it re-associates the thread with the + // specified namespace and calls the initial user function. + func.get()(); + } + } + + // It's not safe to use process::Queue when not all of its callers are + // managed by libprocess. Calling Future::await() in looper thread + // might cause the looper thread to be donated to a libprocess Process. + // If that Process is very busy (e.g., master or agent Process), it's + // possible that the looper thread will never re-gain control. + // + // ProcessingQueue uses mutex and condition variable to solve this + // problem. ProcessingQueue::get() can block the thread. The main + // use cases for the class are thread workers and thread pools. + template <typename T> + class ProcessingQueue + { + public: + ProcessingQueue() : finished(false) {} + + ~ProcessingQueue() = default; + + // Add an element to the queue and notify one client. + void put(T&& t) + { + synchronized (mutex) { + queue.push(std::forward<T>(t)); + cond.notify_one(); + } + } + + // NOTE: This function blocks the thread. It returns the oldest + // element from the queue and returns None() if the queue is + // shutdowned. + Option<T> get() + { + synchronized (mutex) { + // Wait for either a new queue element or queue shutdown. + while (queue.empty() && !finished) { + synchronized_wait(&cond, &mutex); + } + + if (finished) { + // The queue is shutdowned. + return None(); + } + + // Return the oldest element from the queue. + T t = std::move(queue.front()); + queue.pop(); + return Some(std::move(t)); + } + } + + // Shutdown the queue and notify all clients. + void shutdown() { + synchronized (mutex) { + finished = true; + std::queue<T>().swap(queue); + cond.notify_all(); + } + } + + private: + std::mutex mutex; + std::condition_variable cond; + std::queue<T> queue; + bool finished; + }; + + ProcessingQueue<Func> queue; + std::unique_ptr<std::thread> thread; +}; + } // namespace ns { #endif // __LINUX_NS_HPP__ diff --git a/src/tests/containerizer/ns_tests.cpp b/src/tests/containerizer/ns_tests.cpp index fa4349e..14aad89 100644 --- a/src/tests/containerizer/ns_tests.cpp +++ b/src/tests/containerizer/ns_tests.cpp @@ -266,6 +266,30 @@ TEST(NsTest, ROOT_clone) EXPECT_NONE(status.get()); } + +// Test the ns::NamespaceRunner(). +TEST(NsTest, ROOT_NamespaceRunner) +{ + process::Future<int> r; + + // Initialize the Runner. + ns::NamespaceRunner runner; + + // Run a dummy function in a networking namespace. + lambda::function<Try<int>()> f = []() -> Try<int> { + return 42; + }; + + r = runner.run("/proc/self/ns/net", "net", f); + AWAIT_READY(r); + EXPECT_EQ(r.get(), 42); + + // Run the function with an invalid namespace type. + r = runner.run("/proc/self/ns/net", "mnt", f); + AWAIT_FAILED(r); + EXPECT_EQ(r.failure(), "Invalid argument"); +} + } // namespace tests { } // namespace internal { } // namespace mesos {