http://git-wip-us.apache.org/repos/asf/mesos/blob/78244b53/3rdparty/libprocess/src/poll_socket.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/poll_socket.cpp 
b/3rdparty/libprocess/src/poll_socket.cpp
deleted file mode 100644
index 74acb69..0000000
--- a/3rdparty/libprocess/src/poll_socket.cpp
+++ /dev/null
@@ -1,278 +0,0 @@
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-//     http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License
-
-
-#ifdef __WINDOWS__
-#include <stout/windows.hpp>
-#else
-#include <netinet/tcp.h>
-#endif // __WINDOWS__
-
-#include <process/io.hpp>
-#include <process/loop.hpp>
-#include <process/network.hpp>
-#include <process/socket.hpp>
-
-#include <stout/os/sendfile.hpp>
-#include <stout/os/strerror.hpp>
-#include <stout/os.hpp>
-
-#include "config.hpp"
-#include "poll_socket.hpp"
-
-using std::string;
-
-namespace process {
-namespace network {
-namespace internal {
-
-Try<std::shared_ptr<SocketImpl>> PollSocketImpl::create(int_fd s)
-{
-  return std::make_shared<PollSocketImpl>(s);
-}
-
-
-Try<Nothing> PollSocketImpl::listen(int backlog)
-{
-  if (::listen(get(), backlog) < 0) {
-    return ErrnoError();
-  }
-  return Nothing();
-}
-
-
-Future<std::shared_ptr<SocketImpl>> PollSocketImpl::accept()
-{
-  // Need to hold a copy of `this` so that the underlying socket
-  // doesn't end up getting reused before we return from the call to
-  // `io::poll` and end up accepting a socket incorrectly.
-  auto self = shared(this);
-
-  return io::poll(get(), io::READ)
-    .then([self]() -> Future<std::shared_ptr<SocketImpl>> {
-      Try<int_fd> accepted = network::accept(self->get());
-      if (accepted.isError()) {
-        return Failure(accepted.error());
-      }
-
-      int_fd s = accepted.get();
-      Try<Nothing> nonblock = os::nonblock(s);
-      if (nonblock.isError()) {
-        os::close(s);
-        return Failure("Failed to accept, nonblock: " + nonblock.error());
-      }
-
-      Try<Nothing> cloexec = os::cloexec(s);
-      if (cloexec.isError()) {
-        os::close(s);
-        return Failure("Failed to accept, cloexec: " + cloexec.error());
-      }
-
-      Try<Address> address = network::address(s);
-      if (address.isError()) {
-        os::close(s);
-        return Failure("Failed to get address: " + address.error());
-      }
-
-      // Turn off Nagle (TCP_NODELAY) so pipelined requests don't wait.
-      // NOTE: We cast to `char*` here because the function prototypes
-      // on Windows use `char*` instead of `void*`.
-      if (address->family() == Address::Family::INET4 ||
-          address->family() == Address::Family::INET6) {
-        int on = 1;
-        if (::setsockopt(
-                s,
-                SOL_TCP,
-                TCP_NODELAY,
-                reinterpret_cast<const char*>(&on),
-                sizeof(on)) < 0) {
-          const string error = os::strerror(errno);
-          os::close(s);
-          return Failure(
-              "Failed to turn off the Nagle algorithm: " + stringify(error));
-        }
-      }
-
-      Try<std::shared_ptr<SocketImpl>> impl = create(s);
-      if (impl.isError()) {
-        os::close(s);
-        return Failure("Failed to create socket: " + impl.error());
-      }
-
-      return impl.get();
-    });
-}
-
-
-Future<Nothing> PollSocketImpl::connect(const Address& address)
-{
-  Try<Nothing, SocketError> connect = network::connect(get(), address);
-  if (connect.isError()) {
-    if (net::is_inprogress_error(connect.error().code)) {
-      // Need to hold a copy of `this` so that the underlying socket
-      // doesn't end up getting reused before we return from the call
-      // to `io::poll` and end up connecting incorrectly.
-      auto self = shared(this);
-
-      return io::poll(get(), io::WRITE)
-        .then([self, address]() -> Future<Nothing> {
-          // Now check that a successful connection was made.
-          int opt;
-          socklen_t optlen = sizeof(opt);
-
-          // NOTE: We cast to `char*` here because the function
-          // prototypes on Windows use `char*` instead of `void*`.
-          if (::getsockopt(
-                  self->get(),
-                  SOL_SOCKET,
-                  SO_ERROR,
-                  reinterpret_cast<char*>(&opt),
-                  &optlen) < 0) {
-            return Failure(SocketError(
-                "Failed to get status of connect to " + stringify(address)));
-          }
-
-          if (opt != 0) {
-            return Failure(SocketError(
-                opt,
-                "Failed to connect to " +
-                stringify(address)));
-          }
-
-          return Nothing();
-        });
-    }
-
-    return Failure(connect.error());
-  }
-
-  return Nothing();
-}
-
-
-Future<size_t> PollSocketImpl::recv(char* data, size_t size)
-{
-  // Need to hold a copy of `this` so that the underlying socket
-  // doesn't end up getting reused before we return from the call to
-  // `io::read` and end up reading data incorrectly.
-  auto self = shared(this);
-
-  return io::read(get(), data, size)
-    .then([self](size_t length) {
-      return length;
-    });
-}
-
-
-Future<size_t> PollSocketImpl::send(const char* data, size_t size)
-{
-  CHECK(size > 0); // TODO(benh): Just return 0 if `size` is 0?
-
-  // Need to hold a copy of `this` so that the underlying socket
-  // doesn't end up getting reused before we return.
-  auto self = shared(this);
-
-  // TODO(benh): Reuse `io::write`? Or is `net::send` and
-  // `MSG_NOSIGNAL` critical here?
-  return loop(
-      None(),
-      [self, data, size]() -> Future<Option<size_t>> {
-        while (true) {
-          ssize_t length = net::send(self->get(), data, size, MSG_NOSIGNAL);
-
-          if (length < 0) {
-#ifdef __WINDOWS__
-            int error = WSAGetLastError();
-#else
-            int error = errno;
-#endif // __WINDOWS__
-
-            if (net::is_restartable_error(error)) {
-              // Interrupted, try again now.
-              continue;
-            } else if (!net::is_retryable_error(error)) {
-              // TODO(benh): Confirm that `os::strerror` does the
-              // right thing for `error` on Windows.
-              VLOG(1) << "Socket error while sending: " << os::strerror(error);
-              return Failure(os::strerror(error));
-            }
-
-            return None();
-          }
-
-          return length;
-        }
-      },
-      [self](const Option<size_t>& length) -> Future<ControlFlow<size_t>> {
-        // Retry after we've polled if we don't yet have a result.
-        if (length.isNone()) {
-          return io::poll(self->get(), io::WRITE)
-            .then([](short event) -> ControlFlow<size_t> {
-              CHECK_EQ(io::WRITE, event);
-              return Continue();
-            });
-        }
-        return Break(length.get());
-      });
-}
-
-
-Future<size_t> PollSocketImpl::sendfile(int_fd fd, off_t offset, size_t size)
-{
-  CHECK(size > 0); // TODO(benh): Just return 0 if `size` is 0?
-
-  // Need to hold a copy of `this` so that the underlying socket
-  // doesn't end up getting reused before we return.
-  auto self = shared(this);
-
-  return loop(
-      None(),
-      [self, fd, offset, size]() -> Future<Option<size_t>> {
-        while (true) {
-          Try<ssize_t, SocketError> length = os::sendfile(
-              self->get(),
-              fd,
-              offset,
-              size);
-
-          if (length.isSome()) {
-            CHECK(length.get() >= 0);
-            return length.get();
-          }
-
-          if (net::is_restartable_error(length.error().code)) {
-            // Interrupted, try again now.
-            continue;
-          } else if (!net::is_retryable_error(length.error().code)) {
-            VLOG(1) << length.error().message;
-            return Failure(length.error());
-          }
-
-          return None();
-        }
-      },
-      [self](const Option<size_t>& length) -> Future<ControlFlow<size_t>> {
-        // Retry after we've polled if we don't yet have a result.
-        if (length.isNone()) {
-          return io::poll(self->get(), io::WRITE)
-            .then([](short event) -> ControlFlow<size_t> {
-              CHECK_EQ(io::WRITE, event);
-              return Continue();
-            });
-        }
-        return Break(length.get());
-      });
-}
-
-} // namespace internal {
-} // namespace network {
-} // namespace process {

http://git-wip-us.apache.org/repos/asf/mesos/blob/78244b53/3rdparty/libprocess/src/posix/io.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/posix/io.cpp 
b/3rdparty/libprocess/src/posix/io.cpp
new file mode 100644
index 0000000..3862e3b
--- /dev/null
+++ b/3rdparty/libprocess/src/posix/io.cpp
@@ -0,0 +1,140 @@
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License
+
+#include <process/future.hpp>
+#include <process/io.hpp>
+#include <process/loop.hpp>
+
+#include <stout/error.hpp>
+#include <stout/none.hpp>
+#include <stout/option.hpp>
+#include <stout/os.hpp>
+
+#include <stout/os/int_fd.hpp>
+#include <stout/os/read.hpp>
+#include <stout/os/socket.hpp>
+#include <stout/os/write.hpp>
+
+#include "io_internal.hpp"
+
+namespace process {
+namespace io {
+namespace internal {
+
+Future<size_t> read(int_fd fd, void* data, size_t size)
+{
+  // TODO(benh): Let the system calls do what ever they're supposed to
+  // rather than return 0 here?
+  if (size == 0) {
+    return 0;
+  }
+
+  return loop(
+      None(),
+      [=]() -> Future<Option<size_t>> {
+        // Because the file descriptor is non-blocking, we call
+        // read()/recv() immediately. If no data is available than
+        // we'll call `poll` and block. We also observed that for some
+        // combination of libev and Linux kernel versions, the poll
+        // would block for non-deterministically long periods of
+        // time. This may be fixed in a newer version of libev (we use
+        // 3.8 at the time of writing this comment).
+        ssize_t length = os::read(fd, data, size);
+        if (length < 0) {
+#ifdef __WINDOWS__
+          WindowsSocketError error;
+#else
+          ErrnoError error;
+#endif // __WINDOWS__
+
+          if (!net::is_restartable_error(error.code) &&
+              !net::is_retryable_error(error.code)) {
+            return Failure(error.message);
+          }
+
+          return None();
+        }
+
+        return length;
+      },
+      [=](const Option<size_t>& length) -> Future<ControlFlow<size_t>> {
+        // Restart/retry if we don't yet have a result.
+        if (length.isNone()) {
+          return io::poll(fd, io::READ)
+            .then([](short event) -> ControlFlow<size_t> {
+              CHECK_EQ(io::READ, event);
+              return Continue();
+            });
+        }
+        return Break(length.get());
+      });
+}
+
+
+Future<size_t> write(int_fd fd, const void* data, size_t size)
+{
+  // TODO(benh): Let the system calls do what ever they're supposed to
+  // rather than return 0 here?
+  if (size == 0) {
+    return 0;
+  }
+
+  return loop(
+      None(),
+      [=]() -> Future<Option<size_t>> {
+        ssize_t length = os::write(fd, data, size);
+
+        if (length < 0) {
+#ifdef __WINDOWS__
+          WindowsSocketError error;
+#else
+          ErrnoError error;
+#endif // __WINDOWS__
+
+          if (!net::is_restartable_error(error.code) &&
+              !net::is_retryable_error(error.code)) {
+            return Failure(error.message);
+          }
+
+          return None();
+        }
+
+        return length;
+      },
+      [=](const Option<size_t>& length) -> Future<ControlFlow<size_t>> {
+        // Restart/retry if we don't yet have a result.
+        if (length.isNone()) {
+          return io::poll(fd, io::WRITE)
+            .then([](short event) -> ControlFlow<size_t> {
+              CHECK_EQ(io::WRITE, event);
+              return Continue();
+            });
+        }
+        return Break(length.get());
+      });
+}
+
+
+Try<Nothing> prepare_async(int_fd fd)
+{
+  return os::nonblock(fd);
+}
+
+
+Try<bool> is_async(int_fd fd)
+{
+  return os::isNonblock(fd);
+}
+
+} // namespace internal {
+} // namespace io {
+} // namespace process {

http://git-wip-us.apache.org/repos/asf/mesos/blob/78244b53/3rdparty/libprocess/src/posix/libev/libev.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/posix/libev/libev.cpp 
b/3rdparty/libprocess/src/posix/libev/libev.cpp
new file mode 100644
index 0000000..173ee46
--- /dev/null
+++ b/3rdparty/libprocess/src/posix/libev/libev.cpp
@@ -0,0 +1,166 @@
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License
+
+#include <ev.h>
+
+#include <mutex>
+#include <queue>
+
+#include <stout/duration.hpp>
+#include <stout/lambda.hpp>
+#include <stout/nothing.hpp>
+
+#include "event_loop.hpp"
+#include "libev.hpp"
+
+namespace process {
+
+ev_async async_watcher;
+// We need an asynchronous watcher to receive the request to shutdown.
+ev_async shutdown_watcher;
+
+// Define the initial values for all of the declarations made in
+// libev.hpp (since these need to live in the static data space).
+struct ev_loop* loop = nullptr;
+
+std::queue<ev_io*>* watchers = new std::queue<ev_io*>();
+
+std::mutex* watchers_mutex = new std::mutex();
+
+std::queue<lambda::function<void()>>* functions =
+  new std::queue<lambda::function<void()>>();
+
+thread_local bool* _in_event_loop_ = nullptr;
+
+
+void handle_async(struct ev_loop* loop, ev_async* _, int revents)
+{
+  std::queue<lambda::function<void()>> run_functions;
+  synchronized (watchers_mutex) {
+    // Start all the new I/O watchers.
+    while (!watchers->empty()) {
+      ev_io* watcher = watchers->front();
+      watchers->pop();
+      ev_io_start(loop, watcher);
+    }
+
+    // Swap the functions into a temporary queue so that we can invoke
+    // them outside of the mutex.
+    std::swap(run_functions, *functions);
+  }
+
+  // Running the functions outside of the mutex reduces locking
+  // contention as these are arbitrary functions that can take a long
+  // time to execute. Doing this also avoids a deadlock scenario where
+  // (A) mutexes are acquired before calling `run_in_event_loop`,
+  // followed by locking (B) `watchers_mutex`. If we executed the
+  // functions inside the mutex, then the locking order violation
+  // would be this function acquiring the (B) `watchers_mutex`
+  // followed by the arbitrary function acquiring the (A) mutexes.
+  while (!run_functions.empty()) {
+    (run_functions.front())();
+    run_functions.pop();
+  }
+}
+
+
+void handle_shutdown(struct ev_loop* loop, ev_async* _, int revents)
+{
+  ev_unloop(loop, EVUNLOOP_ALL);
+}
+
+
+void EventLoop::initialize()
+{
+  loop = ev_default_loop(EVFLAG_AUTO);
+
+  ev_async_init(&async_watcher, handle_async);
+  ev_async_init(&shutdown_watcher, handle_shutdown);
+
+  ev_async_start(loop, &async_watcher);
+  ev_async_start(loop, &shutdown_watcher);
+}
+
+
+namespace internal {
+
+void handle_delay(struct ev_loop* loop, ev_timer* timer, int revents)
+{
+  lambda::function<void()>* function =
+    reinterpret_cast<lambda::function<void()>*>(timer->data);
+  (*function)();
+  delete function;
+  ev_timer_stop(loop, timer);
+  delete timer;
+}
+
+
+Future<Nothing> delay(
+    const Duration& duration,
+    const lambda::function<void()>& function)
+{
+  ev_timer* timer = new ev_timer();
+  timer->data = reinterpret_cast<void*>(new 
lambda::function<void()>(function));
+
+  // Determine the 'after' parameter to pass to libev and set it to 0
+  // in the event that it's negative so that we always make sure to
+  // invoke 'function' even if libev doesn't support negative 'after'
+  // values.
+  double after = duration.secs();
+
+  if (after < 0) {
+    after = 0;
+  }
+
+  const double repeat = 0.0;
+
+  ev_timer_init(timer, handle_delay, after, repeat);
+  ev_timer_start(loop, timer);
+
+  return Nothing();
+}
+
+} // namespace internal {
+
+
+void EventLoop::delay(
+    const Duration& duration,
+    const lambda::function<void()>& function)
+{
+  run_in_event_loop<Nothing>(
+      lambda::bind(&internal::delay, duration, function));
+}
+
+
+double EventLoop::time()
+{
+  // TODO(benh): Versus ev_now()?
+  return ev_time();
+}
+
+
+void EventLoop::run()
+{
+  __in_event_loop__ = true;
+
+  ev_loop(loop, 0);
+
+  __in_event_loop__ = false;
+}
+
+
+void EventLoop::stop()
+{
+  ev_async_send(loop, &shutdown_watcher);
+}
+
+} // namespace process {

http://git-wip-us.apache.org/repos/asf/mesos/blob/78244b53/3rdparty/libprocess/src/posix/libev/libev.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/posix/libev/libev.hpp 
b/3rdparty/libprocess/src/posix/libev/libev.hpp
new file mode 100644
index 0000000..d451931
--- /dev/null
+++ b/3rdparty/libprocess/src/posix/libev/libev.hpp
@@ -0,0 +1,96 @@
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License
+
+#ifndef __LIBEV_HPP__
+#define __LIBEV_HPP__
+
+#include <ev.h>
+
+#include <mutex>
+#include <queue>
+
+#include <process/future.hpp>
+#include <process/owned.hpp>
+
+#include <stout/lambda.hpp>
+#include <stout/synchronized.hpp>
+
+namespace process {
+
+// Event loop.
+extern struct ev_loop* loop;
+
+// Asynchronous watcher for interrupting loop to specifically deal
+// with IO watchers and functions (via run_in_event_loop).
+extern ev_async async_watcher;
+
+// Queue of I/O watchers to be asynchronously added to the event loop
+// (protected by 'watchers' below).
+// TODO(benh): Replace this queue with functions that we put in
+// 'functions' below that perform the ev_io_start themselves.
+extern std::queue<ev_io*>* watchers;
+extern std::mutex* watchers_mutex;
+
+// Queue of functions to be invoked asynchronously within the vent
+// loop (protected by 'watchers' above).
+extern std::queue<lambda::function<void()>>* functions;
+
+// Per thread bool pointer. We use a pointer to lazily construct the
+// actual bool.
+extern thread_local bool* _in_event_loop_;
+
+#define __in_event_loop__ *(_in_event_loop_ == nullptr ?                \
+  _in_event_loop_ = new bool(false) : _in_event_loop_)
+
+
+// Wrapper around function we want to run in the event loop.
+template <typename T>
+void _run_in_event_loop(
+    const lambda::function<Future<T>()>& f,
+    const Owned<Promise<T>>& promise)
+{
+  // Don't bother running the function if the future has been discarded.
+  if (promise->future().hasDiscard()) {
+    promise->discard();
+  } else {
+    promise->set(f());
+  }
+}
+
+
+// Helper for running a function in the event loop.
+template <typename T>
+Future<T> run_in_event_loop(const lambda::function<Future<T>()>& f)
+{
+  // If this is already the event loop then just run the function.
+  if (__in_event_loop__) {
+    return f();
+  }
+
+  Owned<Promise<T>> promise(new Promise<T>());
+
+  Future<T> future = promise->future();
+
+  // Enqueue the function.
+  synchronized (watchers_mutex) {
+    functions->push(lambda::bind(&_run_in_event_loop<T>, f, promise));
+  }
+
+  // Interrupt the loop.
+  ev_async_send(loop, &async_watcher);
+
+  return future;
+}
+
+} // namespace process {
+
+#endif // __LIBEV_HPP__

http://git-wip-us.apache.org/repos/asf/mesos/blob/78244b53/3rdparty/libprocess/src/posix/libev/libev_poll.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/posix/libev/libev_poll.cpp 
b/3rdparty/libprocess/src/posix/libev/libev_poll.cpp
new file mode 100644
index 0000000..96913a6
--- /dev/null
+++ b/3rdparty/libprocess/src/posix/libev/libev_poll.cpp
@@ -0,0 +1,142 @@
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License
+
+#include <ev.h>
+
+#include <memory>
+
+#include <process/future.hpp>
+#include <process/process.hpp> // For process::initialize.
+
+#include <stout/lambda.hpp>
+
+#include "libev.hpp"
+
+namespace process {
+
+// Data necessary for polling so we can discard polling and actually
+// stop it in the event loop.
+struct Poll
+{
+  Poll()
+  {
+    // Need to explicitly instantiate the watchers.
+    watcher.io.reset(new ev_io());
+    watcher.async.reset(new ev_async());
+  }
+
+  // An I/O watcher for checking for readability or writeability and
+  // an async watcher for being able to discard the polling.
+  struct {
+    std::shared_ptr<ev_io> io;
+    std::shared_ptr<ev_async> async;
+  } watcher;
+
+  Promise<short> promise;
+};
+
+
+// Event loop callback when I/O is ready on polling file descriptor.
+void polled(struct ev_loop* loop, ev_io* watcher, int revents)
+{
+  Poll* poll = (Poll*) watcher->data;
+
+  ev_io_stop(loop, poll->watcher.io.get());
+
+  // Stop the async watcher (also clears if pending so 'discard_poll'
+  // will not get invoked and we can delete 'poll' here).
+  ev_async_stop(loop, poll->watcher.async.get());
+
+  poll->promise.set(revents);
+
+  delete poll;
+}
+
+
+// Event loop callback when future associated with polling file
+// descriptor has been discarded.
+void discard_poll(struct ev_loop* loop, ev_async* watcher, int revents)
+{
+  Poll* poll = (Poll*) watcher->data;
+
+  // Check and see if we have a pending 'polled' callback and if so
+  // let it "win".
+  if (ev_is_pending(poll->watcher.io.get())) {
+    return;
+  }
+
+  ev_async_stop(loop, poll->watcher.async.get());
+
+  // Stop the I/O watcher (but note we check if pending above) so it
+  // won't get invoked and we can delete 'poll' here.
+  ev_io_stop(loop, poll->watcher.io.get());
+
+  poll->promise.discard();
+
+  delete poll;
+}
+
+
+namespace io {
+namespace internal {
+
+// Helper/continuation of 'poll' on future discard.
+void _poll(const std::shared_ptr<ev_async>& async)
+{
+  ev_async_send(loop, async.get());
+}
+
+
+Future<short> poll(int_fd fd, short events)
+{
+  Poll* poll = new Poll();
+
+  // Have the watchers data point back to the struct.
+  poll->watcher.async->data = poll;
+  poll->watcher.io->data = poll;
+
+  // Get a copy of the future to avoid any races with the event loop.
+  Future<short> future = poll->promise.future();
+
+  // Initialize and start the async watcher.
+  ev_async_init(poll->watcher.async.get(), discard_poll);
+  ev_async_start(loop, poll->watcher.async.get());
+
+  // Make sure we stop polling if a discard occurs on our future.
+  // Note that it's possible that we'll invoke '_poll' when someone
+  // does a discard even after the polling has already completed, but
+  // in this case while we will interrupt the event loop since the
+  // async watcher has already been stopped we won't cause
+  // 'discard_poll' to get invoked.
+  future.onDiscard(lambda::bind(&_poll, poll->watcher.async));
+
+  // Initialize and start the I/O watcher.
+  ev_io_init(poll->watcher.io.get(), polled, fd, events);
+  ev_io_start(loop, poll->watcher.io.get());
+
+  return future;
+}
+
+} // namespace internal {
+
+
+Future<short> poll(int_fd fd, short events)
+{
+  process::initialize();
+
+  // TODO(benh): Check if the file descriptor is non-blocking?
+
+  return run_in_event_loop<short>(lambda::bind(&internal::poll, fd, events));
+}
+
+} // namespace io {
+} // namespace process {

http://git-wip-us.apache.org/repos/asf/mesos/blob/78244b53/3rdparty/libprocess/src/posix/libevent/libevent.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/posix/libevent/libevent.cpp 
b/3rdparty/libprocess/src/posix/libevent/libevent.cpp
new file mode 100644
index 0000000..fb595bc
--- /dev/null
+++ b/3rdparty/libprocess/src/posix/libevent/libevent.cpp
@@ -0,0 +1,211 @@
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License
+
+#ifndef __WINDOWS__
+#include <unistd.h>
+#endif // __WINDOWS__
+
+#include <mutex>
+
+#include <event2/event.h>
+#include <event2/thread.h>
+#include <event2/util.h>
+
+#include <process/logging.hpp>
+#include <process/once.hpp>
+
+#include <stout/synchronized.hpp>
+
+#include "event_loop.hpp"
+#include "libevent.hpp"
+
+namespace process {
+
+event_base* base = nullptr;
+
+
+static std::mutex* functions_mutex = new std::mutex();
+std::queue<lambda::function<void()>>* functions =
+  new std::queue<lambda::function<void()>>();
+
+
+thread_local bool* _in_event_loop_ = nullptr;
+
+
+void async_function(evutil_socket_t socket, short which, void* arg)
+{
+  event* ev = reinterpret_cast<event*>(arg);
+  event_free(ev);
+
+  std::queue<lambda::function<void()>> q;
+
+  synchronized (functions_mutex) {
+    std::swap(q, *functions);
+  }
+
+  while (!q.empty()) {
+    q.front()();
+    q.pop();
+  }
+}
+
+
+void run_in_event_loop(
+    const lambda::function<void()>& f,
+    EventLoopLogicFlow event_loop_logic_flow)
+{
+  if (__in_event_loop__ && event_loop_logic_flow == ALLOW_SHORT_CIRCUIT) {
+    f();
+    return;
+  }
+
+  synchronized (functions_mutex) {
+    functions->push(f);
+
+    // Add an event and activate it to interrupt the event loop.
+    // TODO(jmlvanre): after libevent v 2.1 we can use
+    // event_self_cbarg instead of re-assigning the event. For now we
+    // manually re-assign the event to pass in the pointer to the
+    // event itself as the callback argument.
+    event* ev = evtimer_new(base, async_function, nullptr);
+
+    // 'event_assign' is only valid on non-pending AND non-active
+    // events. This means we have to assign the callback before
+    // calling 'event_active'.
+    if (evtimer_assign(ev, base, async_function, ev) < 0) {
+      LOG(FATAL) << "Failed to assign callback on event";
+    }
+
+    event_active(ev, EV_TIMEOUT, 0);
+  }
+}
+
+
+void EventLoop::run()
+{
+  __in_event_loop__ = true;
+
+  do {
+    int result = event_base_loop(base, EVLOOP_ONCE);
+    if (result < 0) {
+      LOG(FATAL) << "Failed to run event loop";
+    } else if (result > 0) {
+      // All events are handled, continue event loop.
+      continue;
+    } else {
+      CHECK_EQ(0, result);
+      if (event_base_got_break(base)) {
+        break;
+      } else if (event_base_got_exit(base)) {
+        break;
+      }
+    }
+  } while (true);
+
+  __in_event_loop__ = false;
+}
+
+
+void EventLoop::stop()
+{
+  event_base_loopexit(base, nullptr);
+}
+
+
+namespace internal {
+
+struct Delay
+{
+  lambda::function<void()> function;
+  event* timer;
+};
+
+void handle_delay(evutil_socket_t, short, void* arg)
+{
+  Delay* delay = reinterpret_cast<Delay*>(arg);
+  delay->function();
+  event_free(delay->timer);
+  delete delay;
+}
+
+}  // namespace internal {
+
+
+void EventLoop::delay(
+    const Duration& duration,
+    const lambda::function<void()>& function)
+{
+  internal::Delay* delay = new internal::Delay();
+  delay->timer = evtimer_new(base, &internal::handle_delay, delay);
+  if (delay->timer == nullptr) {
+    LOG(FATAL) << "Failed to delay, evtimer_new";
+  }
+
+  delay->function = function;
+
+  timeval t{0, 0};
+  if (duration > Seconds(0)) {
+    t = duration.timeval();
+  }
+
+  evtimer_add(delay->timer, &t);
+}
+
+
+double EventLoop::time()
+{
+  // We explicitly call `evutil_gettimeofday()` for now to avoid any
+  // issues that may be introduced by using the cached value provided
+  // by `event_base_gettimeofday_cached()`. Since a lot of logic in
+  // libprocess depends on time math, we want to log fatal rather than
+  // cause logic errors if the time fails.
+  timeval t;
+  if (evutil_gettimeofday(&t, nullptr) < 0) {
+    LOG(FATAL) << "Failed to get time, evutil_gettimeofday";
+  }
+
+  return Duration(t).secs();
+}
+
+
+void EventLoop::initialize()
+{
+  static Once* initialized = new Once();
+
+  if (initialized->once()) {
+    return;
+  }
+
+  // We need to initialize Libevent differently depending on the
+  // operating system threading support.
+#if defined(EVTHREAD_USE_PTHREADS_IMPLEMENTED)
+  if (evthread_use_pthreads() < 0) {
+    LOG(FATAL) << "Failed to initialize, evthread_use_pthreads";
+  }
+#elif defined(EVTHREAD_USE_WINDOWS_THREADS_IMPLEMENTED)
+  if (evthread_use_windows_threads() < 0) {
+    LOG(FATAL) << "Failed to initialize, evthread_use_windows_threads";
+  }
+#else
+#error "Libevent must be compiled with either pthread or Windows thread 
support"
+#endif
+
+  base = event_base_new();
+
+  if (base == nullptr) {
+    LOG(FATAL) << "Failed to initialize, event_base_new";
+  }
+
+  initialized->done();
+}
+
+} // namespace process {

http://git-wip-us.apache.org/repos/asf/mesos/blob/78244b53/3rdparty/libprocess/src/posix/libevent/libevent.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/posix/libevent/libevent.hpp 
b/3rdparty/libprocess/src/posix/libevent/libevent.hpp
new file mode 100644
index 0000000..2eb9790
--- /dev/null
+++ b/3rdparty/libprocess/src/posix/libevent/libevent.hpp
@@ -0,0 +1,48 @@
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License
+
+#ifndef __LIBEVENT_HPP__
+#define __LIBEVENT_HPP__
+
+#include <event2/event.h>
+
+#include <stout/lambda.hpp>
+
+namespace process {
+
+// Event loop.
+extern event_base* base;
+
+
+// Per thread bool pointer. We use a pointer to lazily construct the
+// actual bool.
+extern thread_local bool* _in_event_loop_;
+
+
+#define __in_event_loop__ *(_in_event_loop_ == nullptr ?                \
+  _in_event_loop_ = new bool(false) : _in_event_loop_)
+
+
+enum EventLoopLogicFlow
+{
+  ALLOW_SHORT_CIRCUIT,
+  DISALLOW_SHORT_CIRCUIT
+};
+
+
+void run_in_event_loop(
+    const lambda::function<void()>& f,
+    EventLoopLogicFlow event_loop_logic_flow = ALLOW_SHORT_CIRCUIT);
+
+} // namespace process {
+
+#endif // __LIBEVENT_HPP__

http://git-wip-us.apache.org/repos/asf/mesos/blob/78244b53/3rdparty/libprocess/src/posix/libevent/libevent_poll.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/posix/libevent/libevent_poll.cpp 
b/3rdparty/libprocess/src/posix/libevent/libevent_poll.cpp
new file mode 100644
index 0000000..038dde2
--- /dev/null
+++ b/3rdparty/libprocess/src/posix/libevent/libevent_poll.cpp
@@ -0,0 +1,112 @@
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License
+
+#include <event2/event.h>
+
+#include <memory>
+
+#include <process/future.hpp>
+#include <process/io.hpp>
+#include <process/process.hpp> // For process::initialize.
+
+#include "libevent.hpp"
+
+namespace process {
+
+namespace io {
+namespace internal {
+
+struct Poll
+{
+  Promise<short> promise;
+  std::shared_ptr<event> ev;
+};
+
+
+void pollCallback(evutil_socket_t, short what, void* arg)
+{
+  Poll* poll = reinterpret_cast<Poll*>(arg);
+
+  if (poll->promise.future().hasDiscard()) {
+    poll->promise.discard();
+  } else {
+    // Convert libevent specific EV_READ / EV_WRITE to io::* specific
+    // values of these enumerations.
+    short events =
+      ((what & EV_READ) ? io::READ : 0) | ((what & EV_WRITE) ? io::WRITE : 0);
+
+    poll->promise.set(events);
+  }
+
+  // Deleting the `poll` also destructs `ev` and hence triggers `event_free`,
+  // which makes the event non-pending.
+  delete poll;
+}
+
+
+void pollDiscard(const std::weak_ptr<event>& ev, short events)
+{
+  // Discarding inside the event loop prevents `pollCallback()` from being
+  // called twice if the future is discarded.
+  run_in_event_loop([=]() {
+    std::shared_ptr<event> shared = ev.lock();
+    // If `ev` cannot be locked `pollCallback` already ran. If it was locked
+    // but not pending, `pollCallback` is scheduled to be executed.
+    if (static_cast<bool>(shared) &&
+        event_pending(shared.get(), events, nullptr)) {
+      // `event_active` will trigger the `pollCallback` to be executed.
+      event_active(shared.get(), EV_READ, 0);
+    }
+  });
+}
+
+} // namespace internal {
+
+
+Future<short> poll(int_fd fd, short events)
+{
+  process::initialize();
+
+  internal::Poll* poll = new internal::Poll();
+
+  Future<short> future = poll->promise.future();
+
+  // Convert io::READ / io::WRITE to libevent specific values of these
+  // enumerations.
+  short what =
+    ((events & io::READ) ? EV_READ : 0) | ((events & io::WRITE) ? EV_WRITE : 
0);
+
+  // Bind `event_free` to the destructor of the `ev` shared pointer
+  // guaranteeing that the event will be freed only once.
+  poll->ev.reset(
+      event_new(base, fd, what, &internal::pollCallback, poll),
+      event_free);
+
+  if (poll->ev == nullptr) {
+    LOG(FATAL) << "Failed to poll, event_new";
+  }
+
+  // Using a `weak_ptr` prevents `ev` to become a dangling pointer if
+  // the returned future is discarded after the event is triggered.
+  // The `weak_ptr` needs to be created before `event_add` in case
+  // the event is ready and the callback is executed before creating
+  // `ev`.
+  std::weak_ptr<event> ev(poll->ev);
+
+  event_add(poll->ev.get(), nullptr);
+
+  return future
+    .onDiscard(lambda::bind(&internal::pollDiscard, ev, what));
+}
+
+} // namespace io {
+} // namespace process {

http://git-wip-us.apache.org/repos/asf/mesos/blob/78244b53/3rdparty/libprocess/src/posix/libevent/libevent_ssl_socket.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/posix/libevent/libevent_ssl_socket.cpp 
b/3rdparty/libprocess/src/posix/libevent/libevent_ssl_socket.cpp
new file mode 100644
index 0000000..436b389
--- /dev/null
+++ b/3rdparty/libprocess/src/posix/libevent/libevent_ssl_socket.cpp
@@ -0,0 +1,1249 @@
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License
+
+#include <event2/buffer.h>
+#include <event2/bufferevent_ssl.h>
+#include <event2/event.h>
+#include <event2/listener.h>
+#include <event2/thread.h>
+#include <event2/util.h>
+
+#include <openssl/ssl.h>
+#include <openssl/err.h>
+
+#include <process/queue.hpp>
+#include <process/socket.hpp>
+
+#include <process/ssl/flags.hpp>
+
+#include <stout/net.hpp>
+#include <stout/synchronized.hpp>
+
+#include <stout/os/close.hpp>
+#include <stout/os/dup.hpp>
+#include <stout/os/fcntl.hpp>
+
+#include "libevent.hpp"
+#include "libevent_ssl_socket.hpp"
+#include "openssl.hpp"
+#include "poll_socket.hpp"
+
+// Locking:
+//
+// We use the BEV_OPT_THREADSAFE flag when constructing bufferevents
+// so that all **functions that are called from the event loop that
+// take a bufferevent as a parameter will automatically have the
+// lock acquired**.
+//
+// This means that everywhere that the libevent library does not
+// already lock the bev, we need to manually 'synchronize (bev) {'.
+// To further complicate matters, due to a deadlock scneario in
+// libevent-openssl (v 2.0.21) we currently modify bufferevents using
+// continuations in the event loop, but these functions, while run
+// from within the event loop, are not passed the 'bev' as a parameter
+// and thus MUST use 'synchronized (bev)'. See 'Continuation' comment
+// below for more details on why we need to invoke these continuations
+// from within the event loop.
+
+// Continuations via 'run_in_event_loop(...)':
+//
+// There is a deadlock scenario in libevent-openssl (v 2.0.21) when
+// modifying the bufferevent (bev) from another thread (not the event
+// loop). To avoid this we run all bufferevent manipulation logic in
+// continuations that are executed within the event loop.
+
+// DISALLOW_SHORT_CIRCUIT:
+//
+// We disallow short-circuiting in 'run_in_event_loop' due to a bug in
+// libevent_openssl with deferred callbacks still being called (still
+// in the run queue) even though a bev has been disabled.
+
+using std::queue;
+using std::string;
+
+// Specialization of 'synchronize' to use bufferevent with the
+// 'synchronized' macro.
+static Synchronized<bufferevent> synchronize(bufferevent* bev)
+{
+  return Synchronized<bufferevent>(
+      bev,
+      [](bufferevent* bev) { bufferevent_lock(bev); },
+      [](bufferevent* bev) { bufferevent_unlock(bev); });
+}
+
+namespace process {
+namespace network {
+namespace internal {
+
+Try<std::shared_ptr<SocketImpl>> LibeventSSLSocketImpl::create(int_fd s)
+{
+  openssl::initialize();
+
+  if (!openssl::flags().enabled) {
+    return Error("SSL is disabled");
+  }
+
+  auto socket = std::make_shared<LibeventSSLSocketImpl>(s);
+  // See comment at 'initialize' declaration for why we call this.
+  socket->initialize();
+  return socket;
+}
+
+
+LibeventSSLSocketImpl::~LibeventSSLSocketImpl()
+{
+  // We defer termination and destruction of all event loop specific
+  // calls and structures. This is a safety against the socket being
+  // destroyed before existing event loop calls have completed since
+  // they require valid data structures (the weak pointer).
+  //
+  // Release ownership of the file descriptor so that
+  // we can defer closing the socket.
+  int_fd fd = release();
+  CHECK(fd >= 0);
+
+  evconnlistener* _listener = listener;
+  bufferevent* _bev = bev;
+  std::weak_ptr<LibeventSSLSocketImpl>* _event_loop_handle = event_loop_handle;
+
+  run_in_event_loop(
+      [_listener, _bev, _event_loop_handle, fd]() {
+        // Once this lambda is called, it should not be possible for
+        // more event loop callbacks to be triggered with 'this->bev'.
+        // This is important because we delete event_loop_handle which
+        // is the callback argument for any event loop callbacks.
+        // This lambda is responsible for ensuring 'this->bev' is
+        // disabled, and cleaning up any remaining state associated
+        // with the event loop.
+
+        CHECK(__in_event_loop__);
+
+        if (_listener != nullptr) {
+          evconnlistener_free(_listener);
+        }
+
+        if (_bev != nullptr) {
+          // NOTE: Removes all future callbacks using 'this->bev'.
+          bufferevent_disable(_bev, EV_READ | EV_WRITE);
+
+          SSL* ssl = bufferevent_openssl_get_ssl(_bev);
+          SSL_free(ssl);
+          bufferevent_free(_bev);
+        }
+
+        CHECK_SOME(os::close(fd)) << "Failed to close socket";
+
+        delete _event_loop_handle;
+      },
+      DISALLOW_SHORT_CIRCUIT);
+}
+
+
+void LibeventSSLSocketImpl::initialize()
+{
+  event_loop_handle = new std::weak_ptr<LibeventSSLSocketImpl>(shared(this));
+}
+
+
+Try<Nothing, SocketError> LibeventSSLSocketImpl::shutdown(int how)
+{
+  // Nothing to do if this socket was never initialized.
+  synchronized (lock) {
+    if (bev == nullptr) {
+      // If it was not initialized, then there should also be no
+      // requests.
+      CHECK(connect_request.get() == nullptr);
+      CHECK(recv_request.get() == nullptr);
+      CHECK(send_request.get() == nullptr);
+
+      // We expect this to fail and generate an 'ENOTCONN' failure as
+      // no connection should exist at this point.
+      if (::shutdown(s, how) < 0) {
+        return SocketError();
+      }
+
+      return Nothing();
+    }
+  }
+
+  // Extend the life-time of 'this' through the execution of the
+  // lambda in the event loop. Note: The 'self' needs to be explicitly
+  // captured because we're not using it in the body of the lambda. We
+  // can use a 'shared_ptr' because run_in_event_loop is guaranteed to
+  // execute.
+  auto self = shared(this);
+
+  run_in_event_loop(
+      [self]() {
+        CHECK(__in_event_loop__);
+        CHECK(self);
+
+        CHECK_NOTNULL(self->bev);
+
+        synchronized (self->bev) {
+          Owned<RecvRequest> request;
+
+          // Swap the 'recv_request' under the object lock.
+          synchronized (self->lock) {
+            std::swap(request, self->recv_request);
+          }
+
+          // If there is still a pending receive request then close it.
+          if (request.get() != nullptr) {
+            request->promise
+              .set(bufferevent_read(self->bev, request->data, request->size));
+          }
+
+          // Workaround for SSL shutdown, see 
http://www.wangafu.net/~nickm/libevent-book/Ref6a_advanced_bufferevents.html // 
NOLINT
+          SSL* ssl = bufferevent_openssl_get_ssl(self->bev);
+          SSL_set_shutdown(ssl, SSL_RECEIVED_SHUTDOWN);
+          SSL_shutdown(ssl);
+        }
+      },
+      DISALLOW_SHORT_CIRCUIT);
+
+  return Nothing();
+}
+
+
+// Only runs in event loop. No locks required. See 'Locking' note at
+// top of file.
+void LibeventSSLSocketImpl::recv_callback(bufferevent* /*bev*/, void* arg)
+{
+  CHECK(__in_event_loop__);
+
+  std::weak_ptr<LibeventSSLSocketImpl>* handle =
+    
reinterpret_cast<std::weak_ptr<LibeventSSLSocketImpl>*>(CHECK_NOTNULL(arg));
+
+  std::shared_ptr<LibeventSSLSocketImpl> impl(handle->lock());
+
+  // Don't call the 'recv_callback' unless the socket is still valid.
+  if (impl != nullptr) {
+    impl->recv_callback();
+  }
+}
+
+
+// Only runs in event loop. Member function continuation of static
+// 'recv_callback'. This function can be called from two places -
+// a) `LibeventSSLSocketImpl::recv` when a new Socket::recv is called and there
+//    is buffer available to read.
+// b) `LibeventSSLSocketImpl::recv_callback when libevent calls the deferred
+//    read callback.
+void LibeventSSLSocketImpl::recv_callback()
+{
+  CHECK(__in_event_loop__);
+
+  Owned<RecvRequest> request;
+
+  const size_t buffer_length = evbuffer_get_length(bufferevent_get_input(bev));
+
+  // Swap out the request object IFF there is buffer available to read. We 
check
+  // this here because it is possible that when the libevent deferred callback
+  // was called, a Socket::recv context already read the buffer from the event.
+  // Following sequence is possible:
+  // a. libevent finds a buffer ready to be read.
+  // b. libevent queues buffer event to be dispatched.
+  // c. Socket::recv is called that creates a new request.
+  // d. Socket::recv finds buffer length > 0.
+  // e. Socket::recv reads the buffer.
+  // f. A new request Socket::recv is called which creates a new request.
+  // g. libevent callback is called for the event queued at step b.
+  // h. libevent callback finds the length of the buffer as 0 but the request 
is
+  //    a non-nullptr due to step f.
+  if (buffer_length > 0 || received_eof) {
+    synchronized (lock) {
+      std::swap(request, recv_request);
+    }
+  }
+
+  if (request.get() != nullptr) {
+    if (buffer_length > 0) {
+      size_t length = bufferevent_read(bev, request->data, request->size);
+      CHECK(length > 0);
+
+      request->promise.set(length);
+    } else {
+      CHECK(received_eof);
+      request->promise.set(0);
+    }
+  }
+}
+
+
+// Only runs in event loop. No locks required. See 'Locking' note at
+// top of file.
+void LibeventSSLSocketImpl::send_callback(bufferevent* /*bev*/, void* arg)
+{
+  CHECK(__in_event_loop__);
+
+  std::weak_ptr<LibeventSSLSocketImpl>* handle =
+    
reinterpret_cast<std::weak_ptr<LibeventSSLSocketImpl>*>(CHECK_NOTNULL(arg));
+
+  std::shared_ptr<LibeventSSLSocketImpl> impl(handle->lock());
+
+  // Don't call the 'send_callback' unless the socket is still valid.
+  if (impl != nullptr) {
+    impl->send_callback();
+  }
+}
+
+
+// Only runs in event loop. Member function continuation of static
+// 'recv_callback'.
+void LibeventSSLSocketImpl::send_callback()
+{
+  CHECK(__in_event_loop__);
+
+  Owned<SendRequest> request;
+
+  synchronized (lock) {
+    std::swap(request, send_request);
+  }
+
+  if (request.get() != nullptr) {
+    request->promise.set(request->size);
+  }
+}
+
+
+// Only runs in event loop. No locks required. See 'Locking' note at
+// top of file.
+void LibeventSSLSocketImpl::event_callback(
+    bufferevent* /*bev*/,
+    short events,
+    void* arg)
+{
+  CHECK(__in_event_loop__);
+
+  std::weak_ptr<LibeventSSLSocketImpl>* handle =
+    
reinterpret_cast<std::weak_ptr<LibeventSSLSocketImpl>*>(CHECK_NOTNULL(arg));
+
+  std::shared_ptr<LibeventSSLSocketImpl> impl(handle->lock());
+
+  // Don't call the 'event_callback' unless the socket is still valid.
+  if (impl != nullptr) {
+    impl->event_callback(events);
+  }
+}
+
+
+// Only runs in event loop. Member function continuation of static
+// 'recv_callback'.
+void LibeventSSLSocketImpl::event_callback(short events)
+{
+  CHECK(__in_event_loop__);
+
+  // TODO(bmahler): Libevent's invariant is that `events` contains:
+  //
+  //   (1) one of BEV_EVENT_READING or BEV_EVENT_WRITING to
+  //       indicate whether the event was on the read or write path.
+  //
+  //   (2) one of BEV_EVENT_EOF, BEV_EVENT_ERROR, BEV_EVENT_TIMEOUT,
+  //       BEV_EVENT_CONNECTED.
+  //
+  // (1) allows us to handle read and write errors separately.
+  // HOWEVER, for SSL bufferevents in 2.0.x, libevent never seems
+  // to tell us about BEV_EVENT_READING or BEV_EVENT_WRITING,
+  // which forces us to write incorrect logic here by treating all
+  // events as affecting both reads and writes.
+  //
+  // This has been fixed in 2.1.x:
+  //   2.1 "What's New":
+  //     
https://github.com/libevent/libevent/blob/release-2.1.8-stable/whatsnew-2.1.txt#L333-L335
 // NOLINT
+  //   Commit:
+  //     https://github.com/libevent/libevent/commit/f7eb69ace
+  //
+  // We should require 2.1.x so that we can correctly distinguish
+  // between the read and write errors, and not have two code paths
+  // depending on the libevent version, see MESOS-5999, MESOS-6770.
+
+  Owned<RecvRequest> current_recv_request;
+  Owned<SendRequest> current_send_request;
+  Owned<ConnectRequest> current_connect_request;
+
+  if (events & BEV_EVENT_EOF ||
+      events & BEV_EVENT_CONNECTED ||
+      events & BEV_EVENT_ERROR) {
+    synchronized (lock) {
+      std::swap(current_recv_request, recv_request);
+      std::swap(current_send_request, send_request);
+      std::swap(current_connect_request, connect_request);
+    }
+  }
+
+  // First handle EOF, we also look for `BEV_EVENT_ERROR` with
+  // `EVUTIL_SOCKET_ERROR() == 0` since this occurs as a result
+  // of a "dirty" SSL shutdown (i.e. TCP close before SSL close)
+  // or when this socket has been shut down and further sends
+  // are performed.
+  //
+  // TODO(bmahler): We don't expose "dirty" SSL shutdowns as
+  // recv errors, but perhaps we should?
+  if (events & BEV_EVENT_EOF ||
+     (events & BEV_EVENT_ERROR && EVUTIL_SOCKET_ERROR() == 0)) {
+    received_eof = true;
+
+    if (current_recv_request.get() != nullptr) {
+      // Drain any remaining data from the bufferevent or complete the
+      // promise with 0 to signify EOF. Because we set `received_eof`,
+      // subsequent calls to `recv` will return 0 if there is no data
+      // remaining on the buffer.
+      if (evbuffer_get_length(bufferevent_get_input(bev)) > 0) {
+        size_t length =
+          bufferevent_read(
+              bev,
+              current_recv_request->data,
+              current_recv_request->size);
+        CHECK(length > 0);
+
+        current_recv_request->promise.set(length);
+      } else {
+        current_recv_request->promise.set(0);
+      }
+    }
+
+    if (current_send_request.get() != nullptr) {
+      current_send_request->promise.fail("Failed send: connection closed");
+    }
+
+    if (current_connect_request.get() != nullptr) {
+      SSL* ssl = bufferevent_openssl_get_ssl(CHECK_NOTNULL(bev));
+      SSL_free(ssl);
+      bufferevent_free(CHECK_NOTNULL(bev));
+      bev = nullptr;
+      current_connect_request->promise.fail(
+          "Failed connect: connection closed");
+    }
+  } else if (events & BEV_EVENT_CONNECTED) {
+    // We should not have receiving or sending request while still
+    // connecting.
+    CHECK(current_recv_request.get() == nullptr);
+    CHECK(current_send_request.get() == nullptr);
+    CHECK_NOTNULL(current_connect_request.get());
+
+    // If we're connecting, then we've succeeded. Time to do
+    // post-verification.
+    CHECK_NOTNULL(bev);
+
+    // Do post-validation of connection.
+    SSL* ssl = bufferevent_openssl_get_ssl(bev);
+
+    Try<Nothing> verify = openssl::verify(ssl, peer_hostname, peer_ip);
+    if (verify.isError()) {
+      VLOG(1) << "Failed connect, verification error: " << verify.error();
+      SSL_free(ssl);
+      bufferevent_free(bev);
+      bev = nullptr;
+      current_connect_request->promise.fail(verify.error());
+      return;
+    }
+
+    current_connect_request->promise.set(Nothing());
+  } else if (events & BEV_EVENT_ERROR) {
+    CHECK(EVUTIL_SOCKET_ERROR() != 0);
+    std::ostringstream error_stream;
+    error_stream << evutil_socket_error_to_string(EVUTIL_SOCKET_ERROR());
+
+    // If there is a valid error, fail any requests and log the error.
+    VLOG(1) << "Socket error: " << error_stream.str();
+
+    if (current_recv_request.get() != nullptr) {
+      current_recv_request->promise.fail(
+          "Failed recv, connection error: " +
+          error_stream.str());
+    }
+
+    if (current_send_request.get() != nullptr) {
+      current_send_request->promise.fail(
+          "Failed send, connection error: " +
+          error_stream.str());
+    }
+
+    if (current_connect_request.get() != nullptr) {
+      SSL* ssl = bufferevent_openssl_get_ssl(CHECK_NOTNULL(bev));
+      SSL_free(ssl);
+      bufferevent_free(CHECK_NOTNULL(bev));
+      bev = nullptr;
+      current_connect_request->promise.fail(
+          "Failed connect, connection error: " +
+          error_stream.str());
+    }
+  }
+}
+
+
+LibeventSSLSocketImpl::LibeventSSLSocketImpl(int_fd _s)
+  : SocketImpl(_s),
+    bev(nullptr),
+    listener(nullptr),
+    recv_request(nullptr),
+    send_request(nullptr),
+    connect_request(nullptr),
+    event_loop_handle(nullptr) {}
+
+
+LibeventSSLSocketImpl::LibeventSSLSocketImpl(
+    int_fd _s,
+    bufferevent* _bev,
+    Option<string>&& _peer_hostname)
+  : SocketImpl(_s),
+    bev(_bev),
+    listener(nullptr),
+    recv_request(nullptr),
+    send_request(nullptr),
+    connect_request(nullptr),
+    event_loop_handle(nullptr),
+    peer_hostname(std::move(_peer_hostname)) {}
+
+
+Future<Nothing> LibeventSSLSocketImpl::connect(const Address& address)
+{
+  if (bev != nullptr) {
+    return Failure("Socket is already connected");
+  }
+
+  if (connect_request.get() != nullptr) {
+    return Failure("Socket is already connecting");
+  }
+
+  SSL* ssl = SSL_new(openssl::context());
+  if (ssl == nullptr) {
+    return Failure("Failed to connect: SSL_new");
+  }
+
+  // Construct the bufferevent in the connecting state.
+  // We set 'BEV_OPT_DEFER_CALLBACKS' to avoid calling the
+  // 'event_callback' before 'bufferevent_socket_connect' returns.
+  CHECK(bev == nullptr);
+  bev = bufferevent_openssl_socket_new(
+      base,
+      s,
+      ssl,
+      BUFFEREVENT_SSL_CONNECTING,
+      BEV_OPT_THREADSAFE | BEV_OPT_DEFER_CALLBACKS);
+
+  if (bev == nullptr) {
+    // We need to free 'ssl' here because the bev won't clean it up
+    // for us.
+    SSL_free(ssl);
+    return Failure("Failed to connect: bufferevent_openssl_socket_new");
+  }
+
+  if (address.family() == Address::Family::INET4 ||
+      address.family() == Address::Family::INET6) {
+    // Try and determine the 'peer_hostname' from the address we're
+    // connecting to in order to properly verify the certificate
+    // later.
+    const Try<string> hostname =
+      network::convert<inet::Address>(address)->hostname();
+
+    if (hostname.isError()) {
+      VLOG(2) << "Could not determine hostname of peer: " << hostname.error();
+    } else {
+      VLOG(2) << "Connecting to " << hostname.get();
+      peer_hostname = hostname.get();
+    }
+
+    // Determine the 'peer_ip' from the address we're connecting to in
+    // order to properly verify the certificate later.
+    peer_ip = network::convert<inet::Address>(address)->ip;
+  }
+
+  // Optimistically construct a 'ConnectRequest' and future.
+  Owned<ConnectRequest> request(new ConnectRequest());
+  Future<Nothing> future = request->promise.future();
+
+  // Assign 'connect_request' under lock, fail on error.
+  synchronized (lock) {
+    if (connect_request.get() != nullptr) {
+      SSL_free(ssl);
+      bufferevent_free(bev);
+      bev = nullptr;
+      return Failure("Socket is already connecting");
+    }
+    std::swap(request, connect_request);
+  }
+
+  // Extend the life-time of 'this' through the execution of the
+  // lambda in the event loop. Note: The 'self' needs to be explicitly
+  // captured because we're not using it in the body of the lambda. We
+  // can use a 'shared_ptr' because run_in_event_loop is guaranteed to
+  // execute.
+  auto self = shared(this);
+
+  run_in_event_loop(
+      [self, address]() {
+        sockaddr_storage addr = address;
+
+          // Assign the callbacks for the bufferevent. We do this
+          // before the 'bufferevent_socket_connect()' call to avoid
+          // any race on the underlying buffer events becoming ready.
+          bufferevent_setcb(
+              self->bev,
+              &LibeventSSLSocketImpl::recv_callback,
+              &LibeventSSLSocketImpl::send_callback,
+              &LibeventSSLSocketImpl::event_callback,
+              CHECK_NOTNULL(self->event_loop_handle));
+
+          if (bufferevent_socket_connect(
+                  self->bev,
+                  reinterpret_cast<sockaddr*>(&addr),
+                  address.size()) < 0) {
+            SSL* ssl = bufferevent_openssl_get_ssl(CHECK_NOTNULL(self->bev));
+            SSL_free(ssl);
+            bufferevent_free(self->bev);
+            self->bev = nullptr;
+
+            Owned<ConnectRequest> request;
+
+            // Swap out the 'connect_request' so we can destroy it
+            // outside of the lock.
+            synchronized (self->lock) {
+              std::swap(request, self->connect_request);
+            }
+
+            CHECK_NOTNULL(request.get());
+
+            // Fail the promise since we failed to connect.
+            request->promise.fail(
+                "Failed to connect: bufferevent_socket_connect");
+          }
+      },
+      DISALLOW_SHORT_CIRCUIT);
+
+  return future;
+}
+
+
+Future<size_t> LibeventSSLSocketImpl::recv(char* data, size_t size)
+{
+  // Optimistically construct a 'RecvRequest' and future.
+  Owned<RecvRequest> request(new RecvRequest(data, size));
+  std::weak_ptr<LibeventSSLSocketImpl> weak_self(shared(this));
+
+  // If the user of the future decides to 'discard', then we want to
+  // test whether the request was already satisfied.
+  // We capture a 'weak_ptr' to 'this' (as opposed to a 'shared_ptr')
+  // because the socket could be destroyed before this lambda is
+  // executed. If we used a 'shared_ptr' then this lambda could extend
+  // the life-time of 'this' unnecessarily.
+  Future<size_t> future = request->promise.future()
+    .onDiscard([weak_self]() {
+      // Extend the life-time of 'this' through the execution of the
+      // lambda in the event loop. Note: The 'self' needs to be
+      // explicitly captured because we're not using it in the body of
+      // the lambda. We can use a 'shared_ptr' because
+      // run_in_event_loop is guaranteed to execute.
+      std::shared_ptr<LibeventSSLSocketImpl> self(weak_self.lock());
+
+      if (self != nullptr) {
+        run_in_event_loop(
+            [self]() {
+              CHECK(__in_event_loop__);
+              CHECK(self);
+
+              Owned<RecvRequest> request;
+
+              synchronized (self->lock) {
+                std::swap(request, self->recv_request);
+              }
+
+              // Only discard if the request hasn't already been
+              // satisfied.
+              if (request.get() != nullptr) {
+                // Discard the promise outside of the object lock as
+                // the callbacks can be expensive.
+                request->promise.discard();
+              }
+            },
+            DISALLOW_SHORT_CIRCUIT);
+      }
+    });
+
+  // Assign 'recv_request' under lock, fail on error.
+  synchronized (lock) {
+    if (recv_request.get() != nullptr) {
+      return Failure("Socket is already receiving");
+    }
+    std::swap(request, recv_request);
+  }
+
+  // Extend the life-time of 'this' through the execution of the
+  // lambda in the event loop. Note: The 'self' needs to be explicitly
+  // captured because we're not using it in the body of the lambda. We
+  // can use a 'shared_ptr' because run_in_event_loop is guaranteed to
+  // execute.
+  auto self = shared(this);
+
+  run_in_event_loop(
+      [self]() {
+        CHECK(__in_event_loop__);
+        CHECK(self);
+
+        bool recv = false;
+
+        // We check to see if 'recv_request' is null. It would be null
+        // if a 'discard' happened before this lambda was executed.
+        synchronized (self->lock) {
+          recv = self->recv_request.get() != nullptr;
+        }
+
+        // Only try to read existing data from the bufferevent if the
+        // request has not already been discarded.
+        if (recv) {
+          synchronized (self->bev) {
+            evbuffer* input = bufferevent_get_input(self->bev);
+            size_t length = evbuffer_get_length(input);
+
+            // If there is already data in the buffer or an EOF has
+            // been received, fulfill the 'recv_request' by calling
+            // 'recv_callback()'. Otherwise do nothing and wait for
+            // the 'recv_callback' to run when we receive data over
+            // the network.
+            if (length > 0 || self->received_eof) {
+              self->recv_callback();
+            }
+          }
+        }
+      },
+      DISALLOW_SHORT_CIRCUIT);
+
+  return future;
+}
+
+
+Future<size_t> LibeventSSLSocketImpl::send(const char* data, size_t size)
+{
+  // Optimistically construct a 'SendRequest' and future.
+  Owned<SendRequest> request(new SendRequest(size));
+  Future<size_t> future = request->promise.future();
+
+  // We don't add an 'onDiscard' continuation to send because we can
+  // not accurately detect how many bytes have been sent. Once we pass
+  // the data to the bufferevent, there is the possibility that parts
+  // of it have been sent. Another reason is that if we send partial
+  // messages (discard only a part of the data), then it is likely
+  // that the receiving end will fail parsing the message.
+
+  // Assign 'send_request' under lock, fail on error.
+  synchronized (lock) {
+    if (send_request.get() != nullptr) {
+      return Failure("Socket is already sending");
+    }
+    std::swap(request, send_request);
+  }
+
+  evbuffer* buffer = CHECK_NOTNULL(evbuffer_new());
+
+  int result = evbuffer_add(buffer, data, size);
+  CHECK_EQ(0, result);
+
+  // Extend the life-time of 'this' through the execution of the
+  // lambda in the event loop. Note: The 'self' needs to be explicitly
+  // captured because we're not using it in the body of the lambda. We
+  // can use a 'shared_ptr' because run_in_event_loop is guaranteed to
+  // execute.
+  auto self = shared(this);
+
+  run_in_event_loop(
+      [self, buffer]() {
+        CHECK(__in_event_loop__);
+        CHECK(self);
+
+        // Check if the socket is closed or the write end has
+        // encountered an error in the interim (i.e. we received
+        // a BEV_EVENT_ERROR with BEV_EVENT_WRITING).
+        bool write = false;
+
+        synchronized (self->lock) {
+          if (self->send_request.get() != nullptr) {
+            write = true;
+          }
+        }
+
+        if (write) {
+          int result = bufferevent_write_buffer(self->bev, buffer);
+          CHECK_EQ(0, result);
+        }
+
+        evbuffer_free(buffer);
+      },
+      DISALLOW_SHORT_CIRCUIT);
+
+  return future;
+}
+
+
+Future<size_t> LibeventSSLSocketImpl::sendfile(
+    int_fd fd,
+    off_t offset,
+    size_t size)
+{
+  // Optimistically construct a 'SendRequest' and future.
+  Owned<SendRequest> request(new SendRequest(size));
+  Future<size_t> future = request->promise.future();
+
+  // Assign 'send_request' under lock, fail on error.
+  synchronized (lock) {
+    if (send_request.get() != nullptr) {
+      return Failure("Socket is already sending");
+    }
+    std::swap(request, send_request);
+  }
+
+  // Duplicate the file descriptor because Libevent will take ownership
+  // and control the lifecycle separately.
+  //
+  // TODO(josephw): We can avoid duplicating the file descriptor in
+  // future versions of Libevent. In Libevent versions 2.1.2 and later,
+  // we may use `evbuffer_file_segment_new` and `evbuffer_add_file_segment`
+  // instead of `evbuffer_add_file`.
+  Try<int_fd> dup = os::dup(fd);
+  if (dup.isError()) {
+    return Failure(dup.error());
+  }
+
+  // NOTE: This is *not* an `int_fd` because `libevent` requires a CRT
+  // integer file descriptor, which we allocate and then use
+  // exclusively here.
+#ifdef __WINDOWS__
+  int owned_fd = dup->crt();
+  // The `os::cloexec` and `os::nonblock` functions do nothing on
+  // Windows, and cannot be called because they take `int_fd`.
+#else
+  int owned_fd = dup.get();
+
+  // Set the close-on-exec flag.
+  Try<Nothing> cloexec = os::cloexec(owned_fd);
+  if (cloexec.isError()) {
+    os::close(owned_fd);
+    return Failure(
+        "Failed to set close-on-exec on duplicated file descriptor: " +
+        cloexec.error());
+  }
+
+  // Make the file descriptor non-blocking.
+  Try<Nothing> nonblock = os::nonblock(owned_fd);
+  if (nonblock.isError()) {
+    os::close(owned_fd);
+    return Failure(
+        "Failed to make duplicated file descriptor non-blocking: " +
+        nonblock.error());
+  }
+#endif // __WINDOWS__
+
+  // Extend the life-time of 'this' through the execution of the
+  // lambda in the event loop. Note: The 'self' needs to be explicitly
+  // captured because we're not using it in the body of the lambda. We
+  // can use a 'shared_ptr' because run_in_event_loop is guaranteed to
+  // execute.
+  auto self = shared(this);
+
+  run_in_event_loop(
+      [self, owned_fd, offset, size]() {
+        CHECK(__in_event_loop__);
+        CHECK(self);
+
+        // Check if the socket is closed or the write end has
+        // encountered an error in the interim (i.e. we received
+        // a BEV_EVENT_ERROR with BEV_EVENT_WRITING).
+        bool write = false;
+
+        synchronized (self->lock) {
+          if (self->send_request.get() != nullptr) {
+            write = true;
+          }
+        }
+
+        if (write) {
+          // NOTE: `evbuffer_add_file` will take ownership of the file
+          // descriptor and close it after it has finished reading it.
+          int result = evbuffer_add_file(
+              bufferevent_get_output(self->bev),
+              owned_fd,
+              offset,
+              size);
+          CHECK_EQ(0, result);
+        } else {
+#ifdef __WINDOWS__
+          // NOTE: `os::close()` on Windows is not compatible with CRT
+          // file descriptors, only `HANDLE` and `SOCKET` types.
+          ::_close(owned_fd);
+#else
+          os::close(owned_fd);
+#endif // __WINDOWS__
+        }
+      },
+      DISALLOW_SHORT_CIRCUIT);
+
+  return future;
+}
+
+
+Try<Nothing> LibeventSSLSocketImpl::listen(int backlog)
+{
+  if (listener != nullptr) {
+    return Error("Socket is already listening");
+  }
+
+  CHECK(bev == nullptr);
+
+  // NOTE: Accepted sockets are nonblocking by default in libevent, but
+  // can be set to block via the `LEV_OPT_LEAVE_SOCKETS_BLOCKING`
+  // flag for `evconnlistener_new`.
+  listener = evconnlistener_new(
+      base,
+      [](evconnlistener* listener,
+         evutil_socket_t socket,
+         sockaddr* addr,
+         int addr_length,
+         void* arg) {
+        CHECK(__in_event_loop__);
+
+        std::weak_ptr<LibeventSSLSocketImpl>* handle =
+          reinterpret_cast<std::weak_ptr<LibeventSSLSocketImpl>*>(
+              CHECK_NOTNULL(arg));
+
+        std::shared_ptr<LibeventSSLSocketImpl> impl(handle->lock());
+
+#ifndef __WINDOWS__
+        // NOTE: Passing the flag `LEV_OPT_CLOSE_ON_EXEC` into
+        // `evconnlistener_new` would atomically set `SOCK_CLOEXEC`
+        // on the accepted socket. However, this flag is not supported
+        // in the minimum recommended version of libevent (2.0.22).
+        Try<Nothing> cloexec = os::cloexec(socket);
+        if (cloexec.isError()) {
+          VLOG(2) << "Failed to accept, cloexec: " << cloexec.error();
+
+          // Propagate the error through the listener's `accept_queue`.
+          if (impl != nullptr) {
+            impl->accept_queue.put(
+                Failure("Failed to accept, cloexec: " + cloexec.error()));
+          }
+
+          os::close(socket);
+          return;
+        }
+#endif // __WINDOWS__
+
+        if (impl != nullptr) {
+          Try<net::IP> ip = net::IP::create(*addr);
+          if (ip.isError()) {
+            VLOG(2) << "Could not convert sockaddr to net::IP: " << ip.error();
+          }
+
+          // We pass the 'listener' into the 'AcceptRequest' because
+          // this function could be executed before 'this->listener'
+          // is set.
+          AcceptRequest* request =
+            new AcceptRequest(
+                  // NOTE: The `int_fd` must be explicitly constructed
+                  // to avoid the `intptr_t` being casted to an `int`,
+                  // resulting in a `HANDLE` instead of a `SOCKET` on
+                  // Windows.
+                  int_fd(socket),
+                  listener,
+                  ip.isSome() ? Option<net::IP>(ip.get()) : None());
+
+          impl->accept_callback(request);
+        }
+      },
+      event_loop_handle,
+      LEV_OPT_REUSEABLE,
+      backlog,
+      s);
+
+  if (listener == nullptr) {
+    return Error("Failed to listen on socket");
+  }
+
+  // TODO(jmlvanre): attach an error callback.
+
+  return Nothing();
+}
+
+
+Future<std::shared_ptr<SocketImpl>> LibeventSSLSocketImpl::accept()
+{
+  // Note that due to MESOS-8448, when the caller discards, it's
+  // possible that we pull an accepted socket out of the queue but
+  // drop it when `.then` transitions to discarded rather than
+  // executing the continuation. This is currently acceptable since
+  // callers only discard when they're breaking their accept loop.
+  // However, from an API perspective, we shouldn't be dropping
+  // the socket on the floor.
+  //
+  // We explicitly specify the return type to avoid a type deduction
+  // issue in some versions of clang. See MESOS-2943.
+  return accept_queue.get()
+    .then([](const Future<std::shared_ptr<SocketImpl>>& impl)
+      -> Future<std::shared_ptr<SocketImpl>> {
+      CHECK(!impl.isPending());
+      return impl;
+    });
+}
+
+
+void LibeventSSLSocketImpl::peek_callback(
+    evutil_socket_t fd,
+    short what,
+    void* arg)
+{
+  CHECK(__in_event_loop__);
+
+  CHECK(what & EV_READ);
+  char data[6];
+
+  // Try to peek the first 6 bytes of the message.
+  ssize_t size = ::recv(fd, data, 6, MSG_PEEK);
+
+  // Based on the function 'ssl23_get_client_hello' in openssl, we
+  // test whether to dispatch to the SSL or non-SSL based accept based
+  // on the following rules:
+  //   1. If there are fewer than 3 bytes: non-SSL.
+  //   2. If the 1st bit of the 1st byte is set AND the 3rd byte is
+  //          equal to SSL2_MT_CLIENT_HELLO: SSL.
+  //   3. If the 1st byte is equal to SSL3_RT_HANDSHAKE AND the 2nd
+  //      byte is equal to SSL3_VERSION_MAJOR and the 6th byte is
+  //      equal to SSL3_MT_CLIENT_HELLO: SSL.
+  //   4. Otherwise: non-SSL.
+
+  // For an ascii based protocol to falsely get dispatched to SSL it
+  // needs to:
+  //   1. Start with an invalid ascii character (0x80).
+  //   2. OR have the first 2 characters be a SYN followed by ETX, and
+  //          then the 6th character be SOH.
+  // These conditions clearly do not constitute valid HTTP requests,
+  // and are unlikely to collide with other existing protocols.
+
+  bool ssl = false; // Default to rule 4.
+
+  if (size < 2) { // Rule 1.
+    ssl = false;
+  } else if ((data[0] & 0x80) && data[2] == SSL2_MT_CLIENT_HELLO) { // Rule 2.
+    ssl = true;
+  } else if (data[0] == SSL3_RT_HANDSHAKE &&
+             data[1] == SSL3_VERSION_MAJOR &&
+             data[5] == SSL3_MT_CLIENT_HELLO) { // Rule 3.
+    ssl = true;
+  }
+
+  AcceptRequest* request = reinterpret_cast<AcceptRequest*>(arg);
+
+  // We call 'event_free()' here because it ensures the event is made
+  // non-pending and inactive before it gets deallocated.
+  event_free(request->peek_event);
+  request->peek_event = nullptr;
+
+  if (ssl) {
+    accept_SSL_callback(request);
+  } else {
+    // Downgrade to a non-SSL socket implementation.
+    //
+    // NOTE: The `int_fd` must be explicitly constructed to avoid the
+    // `intptr_t` being casted to an `int`, resulting in a `HANDLE`
+    // instead of a `SOCKET` on Windows.
+    Try<std::shared_ptr<SocketImpl>> impl = PollSocketImpl::create(int_fd(fd));
+    if (impl.isError()) {
+      request->promise.fail(impl.error());
+    } else {
+      request->promise.set(impl.get());
+    }
+
+    delete request;
+  }
+}
+
+
+void LibeventSSLSocketImpl::accept_callback(AcceptRequest* request)
+{
+  CHECK(__in_event_loop__);
+
+  Queue<Future<std::shared_ptr<SocketImpl>>> accept_queue_ = accept_queue;
+
+  // After the socket is accepted, it must complete the SSL
+  // handshake (or be downgraded to a regular socket) before
+  // we put it in the queue of connected sockets.
+  request->promise.future()
+    .onAny([accept_queue_](Future<std::shared_ptr<SocketImpl>> impl) mutable {
+      accept_queue_.put(impl);
+    });
+
+  // If we support downgrading the connection, first wait for this
+  // socket to become readable. We will then MSG_PEEK it to test
+  // whether we want to dispatch as SSL or non-SSL.
+  if (openssl::flags().support_downgrade) {
+    request->peek_event = event_new(
+        base,
+        request->socket,
+        EV_READ,
+        &LibeventSSLSocketImpl::peek_callback,
+        request);
+    event_add(request->peek_event, nullptr);
+  } else {
+    accept_SSL_callback(request);
+  }
+}
+
+
+void LibeventSSLSocketImpl::accept_SSL_callback(AcceptRequest* request)
+{
+  CHECK(__in_event_loop__);
+
+  // Set up SSL object.
+  SSL* ssl = SSL_new(openssl::context());
+  if (ssl == nullptr) {
+    request->promise.fail("Accept failed, SSL_new");
+    delete request;
+    return;
+  }
+
+  // We use 'request->listener' because 'this->listener' may not have
+  // been set by the time this function is executed. See comment in
+  // the lambda for evconnlistener_new in
+  // 'LibeventSSLSocketImpl::listen'.
+  event_base* ev_base = evconnlistener_get_base(request->listener);
+
+  // Construct the bufferevent in the accepting state.
+  bufferevent* bev = bufferevent_openssl_socket_new(
+      ev_base,
+      request->socket,
+      ssl,
+      BUFFEREVENT_SSL_ACCEPTING,
+      BEV_OPT_THREADSAFE);
+
+  if (bev == nullptr) {
+    request->promise.fail("Accept failed: bufferevent_openssl_socket_new");
+    SSL_free(ssl);
+    delete request;
+    return;
+  }
+
+  bufferevent_setcb(
+      bev,
+      nullptr,
+      nullptr,
+      [](bufferevent* bev, short events, void* arg) {
+        // This handles error states or 'BEV_EVENT_CONNECTED' events
+        // and satisfies the promise by constructing a new socket if
+        // the connection was successfuly established.
+        CHECK(__in_event_loop__);
+
+        AcceptRequest* request =
+          reinterpret_cast<AcceptRequest*>(CHECK_NOTNULL(arg));
+
+        if (events & BEV_EVENT_EOF) {
+          request->promise.fail("Failed accept: connection closed");
+        } else if (events & BEV_EVENT_CONNECTED) {
+          // We will receive a 'CONNECTED' state on an accepting socket
+          // once the connection is established. Time to do
+          // post-verification. First, we need to determine the peer
+          // hostname.
+          Option<string> peer_hostname = None();
+
+          if (request->ip.isSome()) {
+            Try<string> hostname = net::getHostname(request->ip.get());
+
+            if (hostname.isError()) {
+              VLOG(2) << "Could not determine hostname of peer: "
+                      << hostname.error();
+            } else {
+              VLOG(2) << "Accepting from " << hostname.get();
+              peer_hostname = hostname.get();
+            }
+          }
+
+          SSL* ssl = bufferevent_openssl_get_ssl(bev);
+          CHECK_NOTNULL(ssl);
+
+          Try<Nothing> verify =
+            openssl::verify(ssl, peer_hostname, request->ip);
+
+          if (verify.isError()) {
+            VLOG(1) << "Failed accept, verification error: " << verify.error();
+            request->promise.fail(verify.error());
+            SSL_free(ssl);
+            bufferevent_free(bev);
+            // TODO(jmlvanre): Clean up for readability. Consider RAII
+            // or constructing the impl earlier.
+            CHECK(request->socket >= 0);
+            Try<Nothing> close = os::close(request->socket);
+            if (close.isError()) {
+              LOG(FATAL)
+                << "Failed to close socket " << stringify(request->socket)
+                << ": " << close.error();
+            }
+            delete request;
+            return;
+          }
+
+          auto impl = std::shared_ptr<LibeventSSLSocketImpl>(
+              new LibeventSSLSocketImpl(
+                  request->socket,
+                  bev,
+                  std::move(peer_hostname)));
+
+          // See comment at 'initialize' declaration for why we call
+          // this.
+          impl->initialize();
+
+          // We have to wait till after 'initialize()' is invoked for
+          // event_loop_handle to be valid as a callback argument for
+          // the callbacks.
+          bufferevent_setcb(
+              CHECK_NOTNULL(impl->bev),
+              &LibeventSSLSocketImpl::recv_callback,
+              &LibeventSSLSocketImpl::send_callback,
+              &LibeventSSLSocketImpl::event_callback,
+              CHECK_NOTNULL(impl->event_loop_handle));
+
+          request->promise.set(std::dynamic_pointer_cast<SocketImpl>(impl));
+        } else if (events & BEV_EVENT_ERROR) {
+          std::ostringstream stream;
+          if (EVUTIL_SOCKET_ERROR() != 0) {
+            stream << evutil_socket_error_to_string(EVUTIL_SOCKET_ERROR());
+          } else {
+            char buffer[1024] = {};
+            unsigned long error = bufferevent_get_openssl_error(bev);
+            ERR_error_string_n(error, buffer, sizeof(buffer));
+            stream << buffer;
+          }
+
+          // Fail the accept request and log the error.
+          VLOG(1) << "Socket error: " << stream.str();
+
+          SSL* ssl = bufferevent_openssl_get_ssl(CHECK_NOTNULL(bev));
+          SSL_free(ssl);
+          bufferevent_free(bev);
+
+          // TODO(jmlvanre): Clean up for readability. Consider RAII
+          // or constructing the impl earlier.
+          CHECK(request->socket >= 0);
+          Try<Nothing> close = os::close(request->socket);
+          if (close.isError()) {
+            LOG(FATAL)
+              << "Failed to close socket " << stringify(request->socket)
+              << ": " << close.error();
+          }
+          request->promise.fail(
+              "Failed accept: connection error: " + stream.str());
+        }
+
+        delete request;
+      },
+      request);
+}
+
+} // namespace internal {
+} // namespace network {
+} // namespace process {

http://git-wip-us.apache.org/repos/asf/mesos/blob/78244b53/3rdparty/libprocess/src/posix/libevent/libevent_ssl_socket.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/posix/libevent/libevent_ssl_socket.hpp 
b/3rdparty/libprocess/src/posix/libevent/libevent_ssl_socket.hpp
new file mode 100644
index 0000000..6ef5a86
--- /dev/null
+++ b/3rdparty/libprocess/src/posix/libevent/libevent_ssl_socket.hpp
@@ -0,0 +1,198 @@
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License
+
+#ifndef __LIBEVENT_SSL_SOCKET_HPP__
+#define __LIBEVENT_SSL_SOCKET_HPP__
+
+#include <event2/buffer.h>
+#include <event2/bufferevent_ssl.h>
+#include <event2/event.h>
+#include <event2/listener.h>
+#include <event2/util.h>
+
+#include <atomic>
+#include <memory>
+
+#include <process/queue.hpp>
+#include <process/socket.hpp>
+
+namespace process {
+namespace network {
+namespace internal {
+
+class LibeventSSLSocketImpl : public SocketImpl
+{
+public:
+  // See 'Socket::create()'.
+  static Try<std::shared_ptr<SocketImpl>> create(int_fd s);
+
+  LibeventSSLSocketImpl(int_fd _s);
+
+  ~LibeventSSLSocketImpl() override;
+
+  // Implement 'SocketImpl' interface.
+  Future<Nothing> connect(const Address& address) override;
+  Future<size_t> recv(char* data, size_t size) override;
+  // Send does not currently support discard. See implementation.
+  Future<size_t> send(const char* data, size_t size) override;
+  Future<size_t> sendfile(int_fd fd, off_t offset, size_t size) override;
+  Try<Nothing> listen(int backlog) override;
+  Future<std::shared_ptr<SocketImpl>> accept() override;
+  SocketImpl::Kind kind() const override { return SocketImpl::Kind::SSL; }
+
+  // Shuts down the socket.
+  //
+  // NOTE: Although this method accepts an integer which specifies the
+  // shutdown mode, this parameter is ignored because SSL connections
+  // do not have a concept of read/write-only shutdown. If either end
+  // of the socket is closed, then the futures of any outstanding read
+  // requests will be completed (possibly as failures).
+  Try<Nothing, SocketError> shutdown(int how) override;
+
+  // We need a post-initializer because 'shared_from_this()' is not
+  // valid until the constructor has finished.
+  void initialize();
+
+private:
+  // A set of helper functions that transitions an accepted socket to
+  // an SSL connected socket. With the libevent-openssl library, once
+  // we return from the 'accept_callback()' which is scheduled by
+  // 'listen' then we still need to wait for the 'BEV_EVENT_CONNECTED'
+  // state before we know the SSL connection has been established.
+  struct AcceptRequest
+  {
+    AcceptRequest(
+        int_fd _socket,
+        evconnlistener* _listener,
+        const Option<net::IP>& _ip)
+      : peek_event(nullptr),
+        listener(_listener),
+        socket(_socket),
+        ip(_ip) {}
+    event* peek_event;
+    Promise<std::shared_ptr<SocketImpl>> promise;
+    evconnlistener* listener;
+    int_fd socket;
+    Option<net::IP> ip;
+  };
+
+  struct RecvRequest
+  {
+    RecvRequest(char* _data, size_t _size)
+      : data(_data), size(_size) {}
+    Promise<size_t> promise;
+    char* data;
+    size_t size;
+  };
+
+  struct SendRequest
+  {
+    SendRequest(size_t _size)
+      : size(_size) {}
+    Promise<size_t> promise;
+    size_t size;
+  };
+
+  struct ConnectRequest
+  {
+    Promise<Nothing> promise;
+  };
+
+  // This is a private constructor used by the accept helper
+  // functions.
+  LibeventSSLSocketImpl(
+      int_fd _s,
+      bufferevent* bev,
+      Option<std::string>&& peer_hostname);
+
+  // This is called when the equivalent of 'accept' returns. The role
+  // of this function is to set up the SSL object and bev. If we
+  // support both SSL and non-SSL traffic simultaneously then we first
+  // wait for data to be ready and test the hello handshake to
+  // disambiguate between the kinds of traffic.
+  void accept_callback(AcceptRequest* request);
+
+  // This is the continuation of 'accept_callback' that handles an SSL
+  // connection.
+  static void accept_SSL_callback(AcceptRequest* request);
+
+  // This function peeks at the data on an accepted socket to see if
+  // there is an SSL handshake or not. It then dispatches to the
+  // SSL handling function or creates a non-SSL socket.
+  static void peek_callback(evutil_socket_t fd, short what, void* arg);
+
+  // The following are function pairs of static functions to member
+  // functions. The static functions test and hold the weak pointer to
+  // the socket before calling the member functions. This protects
+  // against the socket being destroyed before the event loop calls
+  // the callbacks.
+  static void recv_callback(bufferevent* bev, void* arg);
+  void recv_callback();
+
+  static void send_callback(bufferevent* bev, void* arg);
+  void send_callback();
+
+  static void event_callback(bufferevent* bev, short events, void* arg);
+  void event_callback(short events);
+
+  bufferevent* bev;
+
+  evconnlistener* listener;
+
+  // Protects the following instance variables.
+  std::atomic_flag lock = ATOMIC_FLAG_INIT;
+  Owned<RecvRequest> recv_request;
+  Owned<SendRequest> send_request;
+  Owned<ConnectRequest> connect_request;
+
+  // Indicates whether or not an EOF has been received on this socket.
+  // Our accesses to this member are not synchronized because they all
+  // occur within the event loop, which runs on a single thread.
+  bool received_eof = false;
+
+  // This is a weak pointer to 'this', i.e., ourselves, this class
+  // instance. We need this for our event loop callbacks because it's
+  // possible that we'll actually want to cleanup this socket impl
+  // before the event loop callback gets executed ... and we'll check
+  // in each event loop callback whether or not this weak_ptr is valid
+  // by attempting to upgrade it to shared_ptr. It is the
+  // responsibility of the event loop through the deferred lambda in
+  // the destructor to clean up this pointer.
+  // 1) It is a 'weak_ptr' as opposed to a 'shared_ptr' because we
+  // want to test whether the object is still around from within the
+  // event loop. If it was a 'shared_ptr' then we would be
+  // contributing to the lifetime of the object and would no longer be
+  // able to test the lifetime correctly.
+  // 2) This is a pointer to a 'weak_ptr' so that we can pass this
+  // through to the event loop through the C-interface. We need access
+  // to the 'weak_ptr' from outside the object (in the event loop) to
+  // test if the object is still alive. By maintaining this 'weak_ptr'
+  // on the heap we can be sure it is safe to access from the
+  // event loop until it is destroyed.
+  std::weak_ptr<LibeventSSLSocketImpl>* event_loop_handle;
+
+  // This queue stores accepted sockets that are considered connected
+  // (either the SSL handshake has completed or the socket has been
+  // downgraded). The 'accept()' call returns sockets from this queue.
+  // We wrap the socket in a 'Future' so that we can pass failures or
+  // discards through.
+  Queue<Future<std::shared_ptr<SocketImpl>>> accept_queue;
+
+  Option<std::string> peer_hostname;
+  Option<net::IP> peer_ip;
+};
+
+} // namespace internal {
+} // namespace network {
+} // namespace process {
+
+#endif // __LIBEVENT_SSL_SOCKET_HPP__

Reply via email to