This is an automated email from the ASF dual-hosted git repository.

bmahler pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mesos.git

commit 446280e99723f105b581abf59b5086fb06d218f1
Author: Benjamin Mahler <bmah...@apache.org>
AuthorDate: Tue Jan 12 13:17:53 2021 -0500

    Added support for multiple event loop / IO threads for libev.
    
    The current approach to I/O in libprocess, with a single thread
    performing all of the the I/O polling and I/O syscalls, cannot keep
    up with the I/O load on massive scale mesos clusters (which use
    libev rather than libevent).
    
    This adds support via a LIBPROCESS_LIBEV_NUM_IO_THREADS env variable
    for configuring the number of threads running libev event loops,
    which allows users to spread the IO load across multiple threads.
    
    Review: https://reviews.apache.org/r/73136
---
 3rdparty/libprocess/src/posix/libev/libev.cpp      | 163 +++++++++++++++++----
 3rdparty/libprocess/src/posix/libev/libev.hpp      |  68 ++++++---
 3rdparty/libprocess/src/posix/libev/libev_poll.cpp |  11 +-
 3 files changed, 186 insertions(+), 56 deletions(-)

diff --git a/3rdparty/libprocess/src/posix/libev/libev.cpp 
b/3rdparty/libprocess/src/posix/libev/libev.cpp
index 2343a54..3e818ad 100644
--- a/3rdparty/libprocess/src/posix/libev/libev.cpp
+++ b/3rdparty/libprocess/src/posix/libev/libev.cpp
@@ -13,23 +13,37 @@
 #include <ev.h>
 #include <signal.h>
 
+#include <glog/logging.h>
+
 #include <condition_variable>
 #include <mutex>
 #include <queue>
+#include <string>
+#include <thread>
+#include <vector>
 
 #include <stout/duration.hpp>
+#include <stout/exit.hpp>
 #include <stout/lambda.hpp>
 #include <stout/nothing.hpp>
+#include <stout/numify.hpp>
+#include <stout/option.hpp>
+
+#include <stout/os/getenv.hpp>
 
 #include "event_loop.hpp"
 #include "libev.hpp"
 
+using std::string;
+using std::thread;
+using std::vector;
+
 namespace process {
 
 namespace internal {
 
 // We need a latch (available in C++20 but not in C++11) to
-// wait for the loop to finish, so define a simple one here.
+// wait for all loops to finish, so define a simple one here.
 // To keep this simple, this only allows 1 triggering thread
 // and 1 waiting thread.
 //
@@ -60,31 +74,37 @@ private:
 
 } // namespace internal {
 
-ev_async async_watcher;
-// We need an asynchronous watcher to receive the request to shutdown.
-ev_async shutdown_watcher;
 
-// Define the initial values for all of the declarations made in
-// libev.hpp (since these need to live in the static data space).
-struct ev_loop* loop = nullptr;
+size_t num_loops = 1L;
 
-internal::Latch* loop_destroy_latch = nullptr;
+// Array of async watchers to receive the request to shutdown
+// each event loop.
+ev_async* shutdown_watchers = nullptr;
 
-std::mutex* functions_mutex = new std::mutex();
+ev_async* async_watchers = nullptr;
 
-std::queue<lambda::function<void()>>* functions =
-  new std::queue<lambda::function<void()>>();
+struct ev_loop** loops = nullptr;
 
-thread_local bool* _in_event_loop_ = nullptr;
+internal::Latch* loop_destroy_latch = nullptr;
 
+std::mutex* functions_mutexes = nullptr;
+std::queue<lambda::function<void()>>* functions = nullptr;
 
-void handle_async(struct ev_loop* loop, ev_async* _, int revents)
+thread_local struct ev_loop* _in_event_loop_ = nullptr;
+
+
+void handle_async(struct ev_loop* loop, ev_async* async, int revents)
 {
+  // This is a hack to use the data pointer as integer storage.
+  size_t loop_index = reinterpret_cast<size_t>(async->data);
+
   std::queue<lambda::function<void()>> run_functions;
-  synchronized (functions_mutex) {
-    // Swap the functions into a temporary queue so that we can invoke
-    // them outside of the mutex.
-    std::swap(run_functions, *functions);
+
+  // Swap the functions into a temporary queue so that we can invoke
+  // them outside of the mutex.
+  {
+    std::lock_guard<std::mutex> guard(functions_mutexes[loop_index]);
+    std::swap(run_functions, functions[loop_index]);
   }
 
   // Running the functions outside of the mutex reduces locking
@@ -108,15 +128,61 @@ void handle_shutdown(struct ev_loop* loop, ev_async* _, 
int revents)
 }
 
 
+LoopIndex get_loop(int_fd fd)
+{
+  return LoopIndex(static_cast<size_t>(fd) % num_loops);
+}
+
+
 void EventLoop::initialize()
 {
-  loop = CHECK_NOTNULL(ev_loop_new(EVFLAG_AUTO));
+  // TODO(bmahler): Since this is a new feature, we stick to the
+  // old behavior of a single event loop thread. But, once this
+  // code is known to be stable and helpful, we can consider
+  // increasing the default.
+  num_loops = 1;
+
+  // TODO(bmahler): Load this via a Flag object to eliminate this boilerplate.
+  constexpr char env_var[] = "LIBPROCESS_LIBEV_NUM_IO_THREADS";
+  Option<string> value = os::getenv(env_var);
+  if (value.isSome()) {
+    constexpr size_t maxval = 1024;
+    Try<size_t> number = numify<size_t>(value->c_str());
+    if (number.isSome() && number.get() > 0 && number.get() <= maxval) {
+      VLOG(1) << "Overriding default number of libev io threads"
+              << " (" << num_loops << "), using the value "
+              << env_var << "=" << *number << " instead";
+      num_loops = number.get();
+    } else {
+      EXIT(EXIT_FAILURE)
+          << "Invalid value '" << value.get() << "' for " << env_var
+          << "; Valid values are integers in the range 1 to " << maxval;
+    }
+  }
+
+  loops = new struct ev_loop*[num_loops];
 
-  ev_async_init(&async_watcher, handle_async);
-  ev_async_init(&shutdown_watcher, handle_shutdown);
+  for (size_t i = 0; i < num_loops; ++i) {
+    loops[i] = CHECK_NOTNULL(ev_loop_new(EVFLAG_AUTO));
+  }
+
+  functions_mutexes = new std::mutex[num_loops];
+  functions = new std::queue<lambda::function<void()>>[num_loops];
+
+  async_watchers = new struct ev_async[num_loops];
+  shutdown_watchers = new struct ev_async[num_loops];
 
-  ev_async_start(loop, &async_watcher);
-  ev_async_start(loop, &shutdown_watcher);
+  for (size_t i = 0; i < num_loops; ++i) {
+    // We need to pass the loop index as data, so instead of putting
+    // it on the heap we just use the pointer as integer storage.
+    async_watchers[i].data = reinterpret_cast<void*>(i);
+
+    ev_async_init(&async_watchers[i], handle_async);
+    ev_async_init(&shutdown_watchers[i], handle_shutdown);
+
+    ev_async_start(loops[i], &async_watchers[i]);
+    ev_async_start(loops[i], &shutdown_watchers[i]);
+  }
 
   loop_destroy_latch = new internal::Latch();
 }
@@ -136,6 +202,7 @@ void handle_delay(struct ev_loop* loop, ev_timer* timer, 
int revents)
 
 
 Future<Nothing> delay(
+    struct ev_loop* loop,
     const Duration& duration,
     const lambda::function<void()>& function)
 {
@@ -167,8 +234,11 @@ void EventLoop::delay(
     const Duration& duration,
     const lambda::function<void()>& function)
 {
+  // TODO(bmahler): The current approach is to send all timers to the
+  // loop 0, but we could round robin them.
   run_in_event_loop<Nothing>(
-      lambda::bind(&internal::delay, duration, function));
+      get_loop(0),
+      lambda::bind(&internal::delay, lambda::_1, duration, function));
 }
 
 
@@ -181,9 +251,25 @@ double EventLoop::time()
 
 void EventLoop::run()
 {
-  __in_event_loop__ = true;
-  ev_loop(loop, 0);
-  __in_event_loop__ = false;
+  // Loop 0 runs on this thread, the others get their own threads.
+  vector<thread> threads;
+  threads.reserve(num_loops - 1);
+
+  for (size_t i = 1; i < num_loops; ++i) {
+    threads.push_back(thread([i]() {
+      _in_event_loop_ = loops[i];
+      ev_loop(loops[i], 0);
+      _in_event_loop_ = nullptr;
+    }));
+  }
+
+  _in_event_loop_ = loops[0];
+  ev_loop(loops[0], 0);
+  _in_event_loop_ = nullptr;
+
+  foreach (thread& t, threads) {
+    t.join();
+  }
 
   loop_destroy_latch->trigger();
 }
@@ -191,15 +277,34 @@ void EventLoop::run()
 
 void EventLoop::stop()
 {
-  ev_async_send(loop, &shutdown_watcher);
+  // Stop the loops and wait for them to finish before destroying them.
+  for (size_t i = 0; i < num_loops; ++i) {
+    ev_async_send(loops[i], &shutdown_watchers[i]);
+  }
 
   loop_destroy_latch->wait();
 
   delete loop_destroy_latch;
   loop_destroy_latch = nullptr;
 
-  ev_loop_destroy(loop);
-  loop = nullptr;
+  for (size_t i = 0; i < num_loops; ++i) {
+    ev_loop_destroy(loops[i]);
+  }
+
+  delete[] loops;
+  loops = nullptr;
+
+  delete[] functions;
+  functions = nullptr;
+
+  delete[] functions_mutexes;
+  functions_mutexes = nullptr;
+
+  delete[] async_watchers;
+  async_watchers = nullptr;
+
+  delete[] shutdown_watchers;
+  shutdown_watchers = nullptr;
 }
 
 } // namespace process {
diff --git a/3rdparty/libprocess/src/posix/libev/libev.hpp 
b/3rdparty/libprocess/src/posix/libev/libev.hpp
index 739f634..92fd22c 100644
--- a/3rdparty/libprocess/src/posix/libev/libev.hpp
+++ b/3rdparty/libprocess/src/posix/libev/libev.hpp
@@ -26,48 +26,70 @@
 
 namespace process {
 
-// Event loop.
-extern struct ev_loop* loop;
+// Array of event loops.
+extern struct ev_loop** loops;
 
-// Asynchronous watcher for interrupting loop to specifically deal
+// Array of async watchers for interrupting loops to specifically deal
 // with IO watchers and functions (via run_in_event_loop).
-extern ev_async async_watcher;
+extern ev_async* async_watchers;
 
-// Queue of functions to be invoked asynchronously within the vent
-// loop (protected by 'watchers' above).
-extern std::mutex* functions_mutex;
+// Array of queues of functions to be invoked asynchronously within the
+// event loops (each queue is protected by a mutex).
+extern std::mutex* functions_mutexes;
 extern std::queue<lambda::function<void()>>* functions;
 
-// Per thread bool pointer. We use a pointer to lazily construct the
-// actual bool.
-extern thread_local bool* _in_event_loop_;
+// Per thread loop pointer. If this thread is currently inside an
+// event loop, then this will be set to point to the loop that it's
+// executing inside. Otherwise, will be set to null.
+extern thread_local struct ev_loop* _in_event_loop_;
+
+// This is a wrapper type of the loop index to ensure that
+// `get_loop(fd)` is called to select the correct loop for
+// `run_in_event_loop(...)`.
+struct LoopIndex
+{
+  size_t index;
+
+private:
+  explicit LoopIndex(size_t index) : index(index) {}
+  LoopIndex() = delete;
+  friend LoopIndex get_loop(int_fd fd);
+};
 
-#define __in_event_loop__ *(_in_event_loop_ == nullptr ?                \
-  _in_event_loop_ = new bool(false) : _in_event_loop_)
+
+// Since multiple event loops are supported, and fds are assigned
+// to loops, callers must first get the loop based on the fd.
+LoopIndex get_loop(int_fd fd);
 
 
 // Wrapper around function we want to run in the event loop.
 template <typename T>
 void _run_in_event_loop(
-    const lambda::function<Future<T>()>& f,
+    struct ev_loop* loop,
+    const lambda::function<Future<T>(struct ev_loop*)>& f,
     const Owned<Promise<T>>& promise)
 {
   // Don't bother running the function if the future has been discarded.
   if (promise->future().hasDiscard()) {
     promise->discard();
   } else {
-    promise->set(f());
+    promise->set(f(loop));
   }
 }
 
 
-// Helper for running a function in the event loop.
+// Helper for running a function in one of the event loops.
 template <typename T>
-Future<T> run_in_event_loop(const lambda::function<Future<T>()>& f)
+Future<T> run_in_event_loop(
+    const LoopIndex loop_index,
+    const lambda::function<Future<T>(struct ev_loop*)>& f)
 {
-  // If this is already the event loop then just run the function.
-  if (__in_event_loop__) {
-    return f();
+  struct ev_loop* loop = loops[loop_index.index];
+
+  // If this is already the event loop that we're trying to run the
+  // function within, then just run the function.
+  if (_in_event_loop_ == loop) {
+    return f(loop);
   }
 
   Owned<Promise<T>> promise(new Promise<T>());
@@ -75,12 +97,14 @@ Future<T> run_in_event_loop(const 
lambda::function<Future<T>()>& f)
   Future<T> future = promise->future();
 
   // Enqueue the function.
-  synchronized (functions_mutex) {
-    functions->push(lambda::bind(&_run_in_event_loop<T>, f, promise));
+  {
+    std::lock_guard<std::mutex> guard(functions_mutexes[loop_index.index]);
+    functions[loop_index.index].push(
+        lambda::bind(&_run_in_event_loop<T>, loop, f, promise));
   }
 
   // Interrupt the loop.
-  ev_async_send(loop, &async_watcher);
+  ev_async_send(loop, &async_watchers[loop_index.index]);
 
   return future;
 }
diff --git a/3rdparty/libprocess/src/posix/libev/libev_poll.cpp 
b/3rdparty/libprocess/src/posix/libev/libev_poll.cpp
index 96913a6..11ef053 100644
--- a/3rdparty/libprocess/src/posix/libev/libev_poll.cpp
+++ b/3rdparty/libprocess/src/posix/libev/libev_poll.cpp
@@ -90,13 +90,13 @@ namespace io {
 namespace internal {
 
 // Helper/continuation of 'poll' on future discard.
-void _poll(const std::shared_ptr<ev_async>& async)
+void _poll(struct ev_loop* loop, const std::shared_ptr<ev_async>& async)
 {
   ev_async_send(loop, async.get());
 }
 
 
-Future<short> poll(int_fd fd, short events)
+Future<short> poll(struct ev_loop* loop, int_fd fd, short events)
 {
   Poll* poll = new Poll();
 
@@ -117,7 +117,7 @@ Future<short> poll(int_fd fd, short events)
   // in this case while we will interrupt the event loop since the
   // async watcher has already been stopped we won't cause
   // 'discard_poll' to get invoked.
-  future.onDiscard(lambda::bind(&_poll, poll->watcher.async));
+  future.onDiscard(lambda::bind(&_poll, loop, poll->watcher.async));
 
   // Initialize and start the I/O watcher.
   ev_io_init(poll->watcher.io.get(), polled, fd, events);
@@ -134,8 +134,9 @@ Future<short> poll(int_fd fd, short events)
   process::initialize();
 
   // TODO(benh): Check if the file descriptor is non-blocking?
-
-  return run_in_event_loop<short>(lambda::bind(&internal::poll, fd, events));
+  return run_in_event_loop<short>(
+      get_loop(fd),
+      lambda::bind(&internal::poll, lambda::_1, fd, events));
 }
 
 } // namespace io {

Reply via email to