This is an automated email from the ASF dual-hosted git repository. bmahler pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/mesos.git
commit 446280e99723f105b581abf59b5086fb06d218f1 Author: Benjamin Mahler <bmah...@apache.org> AuthorDate: Tue Jan 12 13:17:53 2021 -0500 Added support for multiple event loop / IO threads for libev. The current approach to I/O in libprocess, with a single thread performing all of the the I/O polling and I/O syscalls, cannot keep up with the I/O load on massive scale mesos clusters (which use libev rather than libevent). This adds support via a LIBPROCESS_LIBEV_NUM_IO_THREADS env variable for configuring the number of threads running libev event loops, which allows users to spread the IO load across multiple threads. Review: https://reviews.apache.org/r/73136 --- 3rdparty/libprocess/src/posix/libev/libev.cpp | 163 +++++++++++++++++---- 3rdparty/libprocess/src/posix/libev/libev.hpp | 68 ++++++--- 3rdparty/libprocess/src/posix/libev/libev_poll.cpp | 11 +- 3 files changed, 186 insertions(+), 56 deletions(-) diff --git a/3rdparty/libprocess/src/posix/libev/libev.cpp b/3rdparty/libprocess/src/posix/libev/libev.cpp index 2343a54..3e818ad 100644 --- a/3rdparty/libprocess/src/posix/libev/libev.cpp +++ b/3rdparty/libprocess/src/posix/libev/libev.cpp @@ -13,23 +13,37 @@ #include <ev.h> #include <signal.h> +#include <glog/logging.h> + #include <condition_variable> #include <mutex> #include <queue> +#include <string> +#include <thread> +#include <vector> #include <stout/duration.hpp> +#include <stout/exit.hpp> #include <stout/lambda.hpp> #include <stout/nothing.hpp> +#include <stout/numify.hpp> +#include <stout/option.hpp> + +#include <stout/os/getenv.hpp> #include "event_loop.hpp" #include "libev.hpp" +using std::string; +using std::thread; +using std::vector; + namespace process { namespace internal { // We need a latch (available in C++20 but not in C++11) to -// wait for the loop to finish, so define a simple one here. +// wait for all loops to finish, so define a simple one here. // To keep this simple, this only allows 1 triggering thread // and 1 waiting thread. // @@ -60,31 +74,37 @@ private: } // namespace internal { -ev_async async_watcher; -// We need an asynchronous watcher to receive the request to shutdown. -ev_async shutdown_watcher; -// Define the initial values for all of the declarations made in -// libev.hpp (since these need to live in the static data space). -struct ev_loop* loop = nullptr; +size_t num_loops = 1L; -internal::Latch* loop_destroy_latch = nullptr; +// Array of async watchers to receive the request to shutdown +// each event loop. +ev_async* shutdown_watchers = nullptr; -std::mutex* functions_mutex = new std::mutex(); +ev_async* async_watchers = nullptr; -std::queue<lambda::function<void()>>* functions = - new std::queue<lambda::function<void()>>(); +struct ev_loop** loops = nullptr; -thread_local bool* _in_event_loop_ = nullptr; +internal::Latch* loop_destroy_latch = nullptr; +std::mutex* functions_mutexes = nullptr; +std::queue<lambda::function<void()>>* functions = nullptr; -void handle_async(struct ev_loop* loop, ev_async* _, int revents) +thread_local struct ev_loop* _in_event_loop_ = nullptr; + + +void handle_async(struct ev_loop* loop, ev_async* async, int revents) { + // This is a hack to use the data pointer as integer storage. + size_t loop_index = reinterpret_cast<size_t>(async->data); + std::queue<lambda::function<void()>> run_functions; - synchronized (functions_mutex) { - // Swap the functions into a temporary queue so that we can invoke - // them outside of the mutex. - std::swap(run_functions, *functions); + + // Swap the functions into a temporary queue so that we can invoke + // them outside of the mutex. + { + std::lock_guard<std::mutex> guard(functions_mutexes[loop_index]); + std::swap(run_functions, functions[loop_index]); } // Running the functions outside of the mutex reduces locking @@ -108,15 +128,61 @@ void handle_shutdown(struct ev_loop* loop, ev_async* _, int revents) } +LoopIndex get_loop(int_fd fd) +{ + return LoopIndex(static_cast<size_t>(fd) % num_loops); +} + + void EventLoop::initialize() { - loop = CHECK_NOTNULL(ev_loop_new(EVFLAG_AUTO)); + // TODO(bmahler): Since this is a new feature, we stick to the + // old behavior of a single event loop thread. But, once this + // code is known to be stable and helpful, we can consider + // increasing the default. + num_loops = 1; + + // TODO(bmahler): Load this via a Flag object to eliminate this boilerplate. + constexpr char env_var[] = "LIBPROCESS_LIBEV_NUM_IO_THREADS"; + Option<string> value = os::getenv(env_var); + if (value.isSome()) { + constexpr size_t maxval = 1024; + Try<size_t> number = numify<size_t>(value->c_str()); + if (number.isSome() && number.get() > 0 && number.get() <= maxval) { + VLOG(1) << "Overriding default number of libev io threads" + << " (" << num_loops << "), using the value " + << env_var << "=" << *number << " instead"; + num_loops = number.get(); + } else { + EXIT(EXIT_FAILURE) + << "Invalid value '" << value.get() << "' for " << env_var + << "; Valid values are integers in the range 1 to " << maxval; + } + } + + loops = new struct ev_loop*[num_loops]; - ev_async_init(&async_watcher, handle_async); - ev_async_init(&shutdown_watcher, handle_shutdown); + for (size_t i = 0; i < num_loops; ++i) { + loops[i] = CHECK_NOTNULL(ev_loop_new(EVFLAG_AUTO)); + } + + functions_mutexes = new std::mutex[num_loops]; + functions = new std::queue<lambda::function<void()>>[num_loops]; + + async_watchers = new struct ev_async[num_loops]; + shutdown_watchers = new struct ev_async[num_loops]; - ev_async_start(loop, &async_watcher); - ev_async_start(loop, &shutdown_watcher); + for (size_t i = 0; i < num_loops; ++i) { + // We need to pass the loop index as data, so instead of putting + // it on the heap we just use the pointer as integer storage. + async_watchers[i].data = reinterpret_cast<void*>(i); + + ev_async_init(&async_watchers[i], handle_async); + ev_async_init(&shutdown_watchers[i], handle_shutdown); + + ev_async_start(loops[i], &async_watchers[i]); + ev_async_start(loops[i], &shutdown_watchers[i]); + } loop_destroy_latch = new internal::Latch(); } @@ -136,6 +202,7 @@ void handle_delay(struct ev_loop* loop, ev_timer* timer, int revents) Future<Nothing> delay( + struct ev_loop* loop, const Duration& duration, const lambda::function<void()>& function) { @@ -167,8 +234,11 @@ void EventLoop::delay( const Duration& duration, const lambda::function<void()>& function) { + // TODO(bmahler): The current approach is to send all timers to the + // loop 0, but we could round robin them. run_in_event_loop<Nothing>( - lambda::bind(&internal::delay, duration, function)); + get_loop(0), + lambda::bind(&internal::delay, lambda::_1, duration, function)); } @@ -181,9 +251,25 @@ double EventLoop::time() void EventLoop::run() { - __in_event_loop__ = true; - ev_loop(loop, 0); - __in_event_loop__ = false; + // Loop 0 runs on this thread, the others get their own threads. + vector<thread> threads; + threads.reserve(num_loops - 1); + + for (size_t i = 1; i < num_loops; ++i) { + threads.push_back(thread([i]() { + _in_event_loop_ = loops[i]; + ev_loop(loops[i], 0); + _in_event_loop_ = nullptr; + })); + } + + _in_event_loop_ = loops[0]; + ev_loop(loops[0], 0); + _in_event_loop_ = nullptr; + + foreach (thread& t, threads) { + t.join(); + } loop_destroy_latch->trigger(); } @@ -191,15 +277,34 @@ void EventLoop::run() void EventLoop::stop() { - ev_async_send(loop, &shutdown_watcher); + // Stop the loops and wait for them to finish before destroying them. + for (size_t i = 0; i < num_loops; ++i) { + ev_async_send(loops[i], &shutdown_watchers[i]); + } loop_destroy_latch->wait(); delete loop_destroy_latch; loop_destroy_latch = nullptr; - ev_loop_destroy(loop); - loop = nullptr; + for (size_t i = 0; i < num_loops; ++i) { + ev_loop_destroy(loops[i]); + } + + delete[] loops; + loops = nullptr; + + delete[] functions; + functions = nullptr; + + delete[] functions_mutexes; + functions_mutexes = nullptr; + + delete[] async_watchers; + async_watchers = nullptr; + + delete[] shutdown_watchers; + shutdown_watchers = nullptr; } } // namespace process { diff --git a/3rdparty/libprocess/src/posix/libev/libev.hpp b/3rdparty/libprocess/src/posix/libev/libev.hpp index 739f634..92fd22c 100644 --- a/3rdparty/libprocess/src/posix/libev/libev.hpp +++ b/3rdparty/libprocess/src/posix/libev/libev.hpp @@ -26,48 +26,70 @@ namespace process { -// Event loop. -extern struct ev_loop* loop; +// Array of event loops. +extern struct ev_loop** loops; -// Asynchronous watcher for interrupting loop to specifically deal +// Array of async watchers for interrupting loops to specifically deal // with IO watchers and functions (via run_in_event_loop). -extern ev_async async_watcher; +extern ev_async* async_watchers; -// Queue of functions to be invoked asynchronously within the vent -// loop (protected by 'watchers' above). -extern std::mutex* functions_mutex; +// Array of queues of functions to be invoked asynchronously within the +// event loops (each queue is protected by a mutex). +extern std::mutex* functions_mutexes; extern std::queue<lambda::function<void()>>* functions; -// Per thread bool pointer. We use a pointer to lazily construct the -// actual bool. -extern thread_local bool* _in_event_loop_; +// Per thread loop pointer. If this thread is currently inside an +// event loop, then this will be set to point to the loop that it's +// executing inside. Otherwise, will be set to null. +extern thread_local struct ev_loop* _in_event_loop_; + +// This is a wrapper type of the loop index to ensure that +// `get_loop(fd)` is called to select the correct loop for +// `run_in_event_loop(...)`. +struct LoopIndex +{ + size_t index; + +private: + explicit LoopIndex(size_t index) : index(index) {} + LoopIndex() = delete; + friend LoopIndex get_loop(int_fd fd); +}; -#define __in_event_loop__ *(_in_event_loop_ == nullptr ? \ - _in_event_loop_ = new bool(false) : _in_event_loop_) + +// Since multiple event loops are supported, and fds are assigned +// to loops, callers must first get the loop based on the fd. +LoopIndex get_loop(int_fd fd); // Wrapper around function we want to run in the event loop. template <typename T> void _run_in_event_loop( - const lambda::function<Future<T>()>& f, + struct ev_loop* loop, + const lambda::function<Future<T>(struct ev_loop*)>& f, const Owned<Promise<T>>& promise) { // Don't bother running the function if the future has been discarded. if (promise->future().hasDiscard()) { promise->discard(); } else { - promise->set(f()); + promise->set(f(loop)); } } -// Helper for running a function in the event loop. +// Helper for running a function in one of the event loops. template <typename T> -Future<T> run_in_event_loop(const lambda::function<Future<T>()>& f) +Future<T> run_in_event_loop( + const LoopIndex loop_index, + const lambda::function<Future<T>(struct ev_loop*)>& f) { - // If this is already the event loop then just run the function. - if (__in_event_loop__) { - return f(); + struct ev_loop* loop = loops[loop_index.index]; + + // If this is already the event loop that we're trying to run the + // function within, then just run the function. + if (_in_event_loop_ == loop) { + return f(loop); } Owned<Promise<T>> promise(new Promise<T>()); @@ -75,12 +97,14 @@ Future<T> run_in_event_loop(const lambda::function<Future<T>()>& f) Future<T> future = promise->future(); // Enqueue the function. - synchronized (functions_mutex) { - functions->push(lambda::bind(&_run_in_event_loop<T>, f, promise)); + { + std::lock_guard<std::mutex> guard(functions_mutexes[loop_index.index]); + functions[loop_index.index].push( + lambda::bind(&_run_in_event_loop<T>, loop, f, promise)); } // Interrupt the loop. - ev_async_send(loop, &async_watcher); + ev_async_send(loop, &async_watchers[loop_index.index]); return future; } diff --git a/3rdparty/libprocess/src/posix/libev/libev_poll.cpp b/3rdparty/libprocess/src/posix/libev/libev_poll.cpp index 96913a6..11ef053 100644 --- a/3rdparty/libprocess/src/posix/libev/libev_poll.cpp +++ b/3rdparty/libprocess/src/posix/libev/libev_poll.cpp @@ -90,13 +90,13 @@ namespace io { namespace internal { // Helper/continuation of 'poll' on future discard. -void _poll(const std::shared_ptr<ev_async>& async) +void _poll(struct ev_loop* loop, const std::shared_ptr<ev_async>& async) { ev_async_send(loop, async.get()); } -Future<short> poll(int_fd fd, short events) +Future<short> poll(struct ev_loop* loop, int_fd fd, short events) { Poll* poll = new Poll(); @@ -117,7 +117,7 @@ Future<short> poll(int_fd fd, short events) // in this case while we will interrupt the event loop since the // async watcher has already been stopped we won't cause // 'discard_poll' to get invoked. - future.onDiscard(lambda::bind(&_poll, poll->watcher.async)); + future.onDiscard(lambda::bind(&_poll, loop, poll->watcher.async)); // Initialize and start the I/O watcher. ev_io_init(poll->watcher.io.get(), polled, fd, events); @@ -134,8 +134,9 @@ Future<short> poll(int_fd fd, short events) process::initialize(); // TODO(benh): Check if the file descriptor is non-blocking? - - return run_in_event_loop<short>(lambda::bind(&internal::poll, fd, events)); + return run_in_event_loop<short>( + get_loop(fd), + lambda::bind(&internal::poll, lambda::_1, fd, events)); } } // namespace io {