http://git-wip-us.apache.org/repos/asf/mesos/blob/78244b53/3rdparty/libprocess/src/poll_socket.cpp ---------------------------------------------------------------------- diff --git a/3rdparty/libprocess/src/poll_socket.cpp b/3rdparty/libprocess/src/poll_socket.cpp deleted file mode 100644 index 74acb69..0000000 --- a/3rdparty/libprocess/src/poll_socket.cpp +++ /dev/null @@ -1,278 +0,0 @@ -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License - - -#ifdef __WINDOWS__ -#include <stout/windows.hpp> -#else -#include <netinet/tcp.h> -#endif // __WINDOWS__ - -#include <process/io.hpp> -#include <process/loop.hpp> -#include <process/network.hpp> -#include <process/socket.hpp> - -#include <stout/os/sendfile.hpp> -#include <stout/os/strerror.hpp> -#include <stout/os.hpp> - -#include "config.hpp" -#include "poll_socket.hpp" - -using std::string; - -namespace process { -namespace network { -namespace internal { - -Try<std::shared_ptr<SocketImpl>> PollSocketImpl::create(int_fd s) -{ - return std::make_shared<PollSocketImpl>(s); -} - - -Try<Nothing> PollSocketImpl::listen(int backlog) -{ - if (::listen(get(), backlog) < 0) { - return ErrnoError(); - } - return Nothing(); -} - - -Future<std::shared_ptr<SocketImpl>> PollSocketImpl::accept() -{ - // Need to hold a copy of `this` so that the underlying socket - // doesn't end up getting reused before we return from the call to - // `io::poll` and end up accepting a socket incorrectly. - auto self = shared(this); - - return io::poll(get(), io::READ) - .then([self]() -> Future<std::shared_ptr<SocketImpl>> { - Try<int_fd> accepted = network::accept(self->get()); - if (accepted.isError()) { - return Failure(accepted.error()); - } - - int_fd s = accepted.get(); - Try<Nothing> nonblock = os::nonblock(s); - if (nonblock.isError()) { - os::close(s); - return Failure("Failed to accept, nonblock: " + nonblock.error()); - } - - Try<Nothing> cloexec = os::cloexec(s); - if (cloexec.isError()) { - os::close(s); - return Failure("Failed to accept, cloexec: " + cloexec.error()); - } - - Try<Address> address = network::address(s); - if (address.isError()) { - os::close(s); - return Failure("Failed to get address: " + address.error()); - } - - // Turn off Nagle (TCP_NODELAY) so pipelined requests don't wait. - // NOTE: We cast to `char*` here because the function prototypes - // on Windows use `char*` instead of `void*`. - if (address->family() == Address::Family::INET4 || - address->family() == Address::Family::INET6) { - int on = 1; - if (::setsockopt( - s, - SOL_TCP, - TCP_NODELAY, - reinterpret_cast<const char*>(&on), - sizeof(on)) < 0) { - const string error = os::strerror(errno); - os::close(s); - return Failure( - "Failed to turn off the Nagle algorithm: " + stringify(error)); - } - } - - Try<std::shared_ptr<SocketImpl>> impl = create(s); - if (impl.isError()) { - os::close(s); - return Failure("Failed to create socket: " + impl.error()); - } - - return impl.get(); - }); -} - - -Future<Nothing> PollSocketImpl::connect(const Address& address) -{ - Try<Nothing, SocketError> connect = network::connect(get(), address); - if (connect.isError()) { - if (net::is_inprogress_error(connect.error().code)) { - // Need to hold a copy of `this` so that the underlying socket - // doesn't end up getting reused before we return from the call - // to `io::poll` and end up connecting incorrectly. - auto self = shared(this); - - return io::poll(get(), io::WRITE) - .then([self, address]() -> Future<Nothing> { - // Now check that a successful connection was made. - int opt; - socklen_t optlen = sizeof(opt); - - // NOTE: We cast to `char*` here because the function - // prototypes on Windows use `char*` instead of `void*`. - if (::getsockopt( - self->get(), - SOL_SOCKET, - SO_ERROR, - reinterpret_cast<char*>(&opt), - &optlen) < 0) { - return Failure(SocketError( - "Failed to get status of connect to " + stringify(address))); - } - - if (opt != 0) { - return Failure(SocketError( - opt, - "Failed to connect to " + - stringify(address))); - } - - return Nothing(); - }); - } - - return Failure(connect.error()); - } - - return Nothing(); -} - - -Future<size_t> PollSocketImpl::recv(char* data, size_t size) -{ - // Need to hold a copy of `this` so that the underlying socket - // doesn't end up getting reused before we return from the call to - // `io::read` and end up reading data incorrectly. - auto self = shared(this); - - return io::read(get(), data, size) - .then([self](size_t length) { - return length; - }); -} - - -Future<size_t> PollSocketImpl::send(const char* data, size_t size) -{ - CHECK(size > 0); // TODO(benh): Just return 0 if `size` is 0? - - // Need to hold a copy of `this` so that the underlying socket - // doesn't end up getting reused before we return. - auto self = shared(this); - - // TODO(benh): Reuse `io::write`? Or is `net::send` and - // `MSG_NOSIGNAL` critical here? - return loop( - None(), - [self, data, size]() -> Future<Option<size_t>> { - while (true) { - ssize_t length = net::send(self->get(), data, size, MSG_NOSIGNAL); - - if (length < 0) { -#ifdef __WINDOWS__ - int error = WSAGetLastError(); -#else - int error = errno; -#endif // __WINDOWS__ - - if (net::is_restartable_error(error)) { - // Interrupted, try again now. - continue; - } else if (!net::is_retryable_error(error)) { - // TODO(benh): Confirm that `os::strerror` does the - // right thing for `error` on Windows. - VLOG(1) << "Socket error while sending: " << os::strerror(error); - return Failure(os::strerror(error)); - } - - return None(); - } - - return length; - } - }, - [self](const Option<size_t>& length) -> Future<ControlFlow<size_t>> { - // Retry after we've polled if we don't yet have a result. - if (length.isNone()) { - return io::poll(self->get(), io::WRITE) - .then([](short event) -> ControlFlow<size_t> { - CHECK_EQ(io::WRITE, event); - return Continue(); - }); - } - return Break(length.get()); - }); -} - - -Future<size_t> PollSocketImpl::sendfile(int_fd fd, off_t offset, size_t size) -{ - CHECK(size > 0); // TODO(benh): Just return 0 if `size` is 0? - - // Need to hold a copy of `this` so that the underlying socket - // doesn't end up getting reused before we return. - auto self = shared(this); - - return loop( - None(), - [self, fd, offset, size]() -> Future<Option<size_t>> { - while (true) { - Try<ssize_t, SocketError> length = os::sendfile( - self->get(), - fd, - offset, - size); - - if (length.isSome()) { - CHECK(length.get() >= 0); - return length.get(); - } - - if (net::is_restartable_error(length.error().code)) { - // Interrupted, try again now. - continue; - } else if (!net::is_retryable_error(length.error().code)) { - VLOG(1) << length.error().message; - return Failure(length.error()); - } - - return None(); - } - }, - [self](const Option<size_t>& length) -> Future<ControlFlow<size_t>> { - // Retry after we've polled if we don't yet have a result. - if (length.isNone()) { - return io::poll(self->get(), io::WRITE) - .then([](short event) -> ControlFlow<size_t> { - CHECK_EQ(io::WRITE, event); - return Continue(); - }); - } - return Break(length.get()); - }); -} - -} // namespace internal { -} // namespace network { -} // namespace process {
http://git-wip-us.apache.org/repos/asf/mesos/blob/78244b53/3rdparty/libprocess/src/posix/io.cpp ---------------------------------------------------------------------- diff --git a/3rdparty/libprocess/src/posix/io.cpp b/3rdparty/libprocess/src/posix/io.cpp new file mode 100644 index 0000000..3862e3b --- /dev/null +++ b/3rdparty/libprocess/src/posix/io.cpp @@ -0,0 +1,140 @@ +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License + +#include <process/future.hpp> +#include <process/io.hpp> +#include <process/loop.hpp> + +#include <stout/error.hpp> +#include <stout/none.hpp> +#include <stout/option.hpp> +#include <stout/os.hpp> + +#include <stout/os/int_fd.hpp> +#include <stout/os/read.hpp> +#include <stout/os/socket.hpp> +#include <stout/os/write.hpp> + +#include "io_internal.hpp" + +namespace process { +namespace io { +namespace internal { + +Future<size_t> read(int_fd fd, void* data, size_t size) +{ + // TODO(benh): Let the system calls do what ever they're supposed to + // rather than return 0 here? + if (size == 0) { + return 0; + } + + return loop( + None(), + [=]() -> Future<Option<size_t>> { + // Because the file descriptor is non-blocking, we call + // read()/recv() immediately. If no data is available than + // we'll call `poll` and block. We also observed that for some + // combination of libev and Linux kernel versions, the poll + // would block for non-deterministically long periods of + // time. This may be fixed in a newer version of libev (we use + // 3.8 at the time of writing this comment). + ssize_t length = os::read(fd, data, size); + if (length < 0) { +#ifdef __WINDOWS__ + WindowsSocketError error; +#else + ErrnoError error; +#endif // __WINDOWS__ + + if (!net::is_restartable_error(error.code) && + !net::is_retryable_error(error.code)) { + return Failure(error.message); + } + + return None(); + } + + return length; + }, + [=](const Option<size_t>& length) -> Future<ControlFlow<size_t>> { + // Restart/retry if we don't yet have a result. + if (length.isNone()) { + return io::poll(fd, io::READ) + .then([](short event) -> ControlFlow<size_t> { + CHECK_EQ(io::READ, event); + return Continue(); + }); + } + return Break(length.get()); + }); +} + + +Future<size_t> write(int_fd fd, const void* data, size_t size) +{ + // TODO(benh): Let the system calls do what ever they're supposed to + // rather than return 0 here? + if (size == 0) { + return 0; + } + + return loop( + None(), + [=]() -> Future<Option<size_t>> { + ssize_t length = os::write(fd, data, size); + + if (length < 0) { +#ifdef __WINDOWS__ + WindowsSocketError error; +#else + ErrnoError error; +#endif // __WINDOWS__ + + if (!net::is_restartable_error(error.code) && + !net::is_retryable_error(error.code)) { + return Failure(error.message); + } + + return None(); + } + + return length; + }, + [=](const Option<size_t>& length) -> Future<ControlFlow<size_t>> { + // Restart/retry if we don't yet have a result. + if (length.isNone()) { + return io::poll(fd, io::WRITE) + .then([](short event) -> ControlFlow<size_t> { + CHECK_EQ(io::WRITE, event); + return Continue(); + }); + } + return Break(length.get()); + }); +} + + +Try<Nothing> prepare_async(int_fd fd) +{ + return os::nonblock(fd); +} + + +Try<bool> is_async(int_fd fd) +{ + return os::isNonblock(fd); +} + +} // namespace internal { +} // namespace io { +} // namespace process { http://git-wip-us.apache.org/repos/asf/mesos/blob/78244b53/3rdparty/libprocess/src/posix/libev/libev.cpp ---------------------------------------------------------------------- diff --git a/3rdparty/libprocess/src/posix/libev/libev.cpp b/3rdparty/libprocess/src/posix/libev/libev.cpp new file mode 100644 index 0000000..173ee46 --- /dev/null +++ b/3rdparty/libprocess/src/posix/libev/libev.cpp @@ -0,0 +1,166 @@ +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License + +#include <ev.h> + +#include <mutex> +#include <queue> + +#include <stout/duration.hpp> +#include <stout/lambda.hpp> +#include <stout/nothing.hpp> + +#include "event_loop.hpp" +#include "libev.hpp" + +namespace process { + +ev_async async_watcher; +// We need an asynchronous watcher to receive the request to shutdown. +ev_async shutdown_watcher; + +// Define the initial values for all of the declarations made in +// libev.hpp (since these need to live in the static data space). +struct ev_loop* loop = nullptr; + +std::queue<ev_io*>* watchers = new std::queue<ev_io*>(); + +std::mutex* watchers_mutex = new std::mutex(); + +std::queue<lambda::function<void()>>* functions = + new std::queue<lambda::function<void()>>(); + +thread_local bool* _in_event_loop_ = nullptr; + + +void handle_async(struct ev_loop* loop, ev_async* _, int revents) +{ + std::queue<lambda::function<void()>> run_functions; + synchronized (watchers_mutex) { + // Start all the new I/O watchers. + while (!watchers->empty()) { + ev_io* watcher = watchers->front(); + watchers->pop(); + ev_io_start(loop, watcher); + } + + // Swap the functions into a temporary queue so that we can invoke + // them outside of the mutex. + std::swap(run_functions, *functions); + } + + // Running the functions outside of the mutex reduces locking + // contention as these are arbitrary functions that can take a long + // time to execute. Doing this also avoids a deadlock scenario where + // (A) mutexes are acquired before calling `run_in_event_loop`, + // followed by locking (B) `watchers_mutex`. If we executed the + // functions inside the mutex, then the locking order violation + // would be this function acquiring the (B) `watchers_mutex` + // followed by the arbitrary function acquiring the (A) mutexes. + while (!run_functions.empty()) { + (run_functions.front())(); + run_functions.pop(); + } +} + + +void handle_shutdown(struct ev_loop* loop, ev_async* _, int revents) +{ + ev_unloop(loop, EVUNLOOP_ALL); +} + + +void EventLoop::initialize() +{ + loop = ev_default_loop(EVFLAG_AUTO); + + ev_async_init(&async_watcher, handle_async); + ev_async_init(&shutdown_watcher, handle_shutdown); + + ev_async_start(loop, &async_watcher); + ev_async_start(loop, &shutdown_watcher); +} + + +namespace internal { + +void handle_delay(struct ev_loop* loop, ev_timer* timer, int revents) +{ + lambda::function<void()>* function = + reinterpret_cast<lambda::function<void()>*>(timer->data); + (*function)(); + delete function; + ev_timer_stop(loop, timer); + delete timer; +} + + +Future<Nothing> delay( + const Duration& duration, + const lambda::function<void()>& function) +{ + ev_timer* timer = new ev_timer(); + timer->data = reinterpret_cast<void*>(new lambda::function<void()>(function)); + + // Determine the 'after' parameter to pass to libev and set it to 0 + // in the event that it's negative so that we always make sure to + // invoke 'function' even if libev doesn't support negative 'after' + // values. + double after = duration.secs(); + + if (after < 0) { + after = 0; + } + + const double repeat = 0.0; + + ev_timer_init(timer, handle_delay, after, repeat); + ev_timer_start(loop, timer); + + return Nothing(); +} + +} // namespace internal { + + +void EventLoop::delay( + const Duration& duration, + const lambda::function<void()>& function) +{ + run_in_event_loop<Nothing>( + lambda::bind(&internal::delay, duration, function)); +} + + +double EventLoop::time() +{ + // TODO(benh): Versus ev_now()? + return ev_time(); +} + + +void EventLoop::run() +{ + __in_event_loop__ = true; + + ev_loop(loop, 0); + + __in_event_loop__ = false; +} + + +void EventLoop::stop() +{ + ev_async_send(loop, &shutdown_watcher); +} + +} // namespace process { http://git-wip-us.apache.org/repos/asf/mesos/blob/78244b53/3rdparty/libprocess/src/posix/libev/libev.hpp ---------------------------------------------------------------------- diff --git a/3rdparty/libprocess/src/posix/libev/libev.hpp b/3rdparty/libprocess/src/posix/libev/libev.hpp new file mode 100644 index 0000000..d451931 --- /dev/null +++ b/3rdparty/libprocess/src/posix/libev/libev.hpp @@ -0,0 +1,96 @@ +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License + +#ifndef __LIBEV_HPP__ +#define __LIBEV_HPP__ + +#include <ev.h> + +#include <mutex> +#include <queue> + +#include <process/future.hpp> +#include <process/owned.hpp> + +#include <stout/lambda.hpp> +#include <stout/synchronized.hpp> + +namespace process { + +// Event loop. +extern struct ev_loop* loop; + +// Asynchronous watcher for interrupting loop to specifically deal +// with IO watchers and functions (via run_in_event_loop). +extern ev_async async_watcher; + +// Queue of I/O watchers to be asynchronously added to the event loop +// (protected by 'watchers' below). +// TODO(benh): Replace this queue with functions that we put in +// 'functions' below that perform the ev_io_start themselves. +extern std::queue<ev_io*>* watchers; +extern std::mutex* watchers_mutex; + +// Queue of functions to be invoked asynchronously within the vent +// loop (protected by 'watchers' above). +extern std::queue<lambda::function<void()>>* functions; + +// Per thread bool pointer. We use a pointer to lazily construct the +// actual bool. +extern thread_local bool* _in_event_loop_; + +#define __in_event_loop__ *(_in_event_loop_ == nullptr ? \ + _in_event_loop_ = new bool(false) : _in_event_loop_) + + +// Wrapper around function we want to run in the event loop. +template <typename T> +void _run_in_event_loop( + const lambda::function<Future<T>()>& f, + const Owned<Promise<T>>& promise) +{ + // Don't bother running the function if the future has been discarded. + if (promise->future().hasDiscard()) { + promise->discard(); + } else { + promise->set(f()); + } +} + + +// Helper for running a function in the event loop. +template <typename T> +Future<T> run_in_event_loop(const lambda::function<Future<T>()>& f) +{ + // If this is already the event loop then just run the function. + if (__in_event_loop__) { + return f(); + } + + Owned<Promise<T>> promise(new Promise<T>()); + + Future<T> future = promise->future(); + + // Enqueue the function. + synchronized (watchers_mutex) { + functions->push(lambda::bind(&_run_in_event_loop<T>, f, promise)); + } + + // Interrupt the loop. + ev_async_send(loop, &async_watcher); + + return future; +} + +} // namespace process { + +#endif // __LIBEV_HPP__ http://git-wip-us.apache.org/repos/asf/mesos/blob/78244b53/3rdparty/libprocess/src/posix/libev/libev_poll.cpp ---------------------------------------------------------------------- diff --git a/3rdparty/libprocess/src/posix/libev/libev_poll.cpp b/3rdparty/libprocess/src/posix/libev/libev_poll.cpp new file mode 100644 index 0000000..96913a6 --- /dev/null +++ b/3rdparty/libprocess/src/posix/libev/libev_poll.cpp @@ -0,0 +1,142 @@ +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License + +#include <ev.h> + +#include <memory> + +#include <process/future.hpp> +#include <process/process.hpp> // For process::initialize. + +#include <stout/lambda.hpp> + +#include "libev.hpp" + +namespace process { + +// Data necessary for polling so we can discard polling and actually +// stop it in the event loop. +struct Poll +{ + Poll() + { + // Need to explicitly instantiate the watchers. + watcher.io.reset(new ev_io()); + watcher.async.reset(new ev_async()); + } + + // An I/O watcher for checking for readability or writeability and + // an async watcher for being able to discard the polling. + struct { + std::shared_ptr<ev_io> io; + std::shared_ptr<ev_async> async; + } watcher; + + Promise<short> promise; +}; + + +// Event loop callback when I/O is ready on polling file descriptor. +void polled(struct ev_loop* loop, ev_io* watcher, int revents) +{ + Poll* poll = (Poll*) watcher->data; + + ev_io_stop(loop, poll->watcher.io.get()); + + // Stop the async watcher (also clears if pending so 'discard_poll' + // will not get invoked and we can delete 'poll' here). + ev_async_stop(loop, poll->watcher.async.get()); + + poll->promise.set(revents); + + delete poll; +} + + +// Event loop callback when future associated with polling file +// descriptor has been discarded. +void discard_poll(struct ev_loop* loop, ev_async* watcher, int revents) +{ + Poll* poll = (Poll*) watcher->data; + + // Check and see if we have a pending 'polled' callback and if so + // let it "win". + if (ev_is_pending(poll->watcher.io.get())) { + return; + } + + ev_async_stop(loop, poll->watcher.async.get()); + + // Stop the I/O watcher (but note we check if pending above) so it + // won't get invoked and we can delete 'poll' here. + ev_io_stop(loop, poll->watcher.io.get()); + + poll->promise.discard(); + + delete poll; +} + + +namespace io { +namespace internal { + +// Helper/continuation of 'poll' on future discard. +void _poll(const std::shared_ptr<ev_async>& async) +{ + ev_async_send(loop, async.get()); +} + + +Future<short> poll(int_fd fd, short events) +{ + Poll* poll = new Poll(); + + // Have the watchers data point back to the struct. + poll->watcher.async->data = poll; + poll->watcher.io->data = poll; + + // Get a copy of the future to avoid any races with the event loop. + Future<short> future = poll->promise.future(); + + // Initialize and start the async watcher. + ev_async_init(poll->watcher.async.get(), discard_poll); + ev_async_start(loop, poll->watcher.async.get()); + + // Make sure we stop polling if a discard occurs on our future. + // Note that it's possible that we'll invoke '_poll' when someone + // does a discard even after the polling has already completed, but + // in this case while we will interrupt the event loop since the + // async watcher has already been stopped we won't cause + // 'discard_poll' to get invoked. + future.onDiscard(lambda::bind(&_poll, poll->watcher.async)); + + // Initialize and start the I/O watcher. + ev_io_init(poll->watcher.io.get(), polled, fd, events); + ev_io_start(loop, poll->watcher.io.get()); + + return future; +} + +} // namespace internal { + + +Future<short> poll(int_fd fd, short events) +{ + process::initialize(); + + // TODO(benh): Check if the file descriptor is non-blocking? + + return run_in_event_loop<short>(lambda::bind(&internal::poll, fd, events)); +} + +} // namespace io { +} // namespace process { http://git-wip-us.apache.org/repos/asf/mesos/blob/78244b53/3rdparty/libprocess/src/posix/libevent/libevent.cpp ---------------------------------------------------------------------- diff --git a/3rdparty/libprocess/src/posix/libevent/libevent.cpp b/3rdparty/libprocess/src/posix/libevent/libevent.cpp new file mode 100644 index 0000000..fb595bc --- /dev/null +++ b/3rdparty/libprocess/src/posix/libevent/libevent.cpp @@ -0,0 +1,211 @@ +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License + +#ifndef __WINDOWS__ +#include <unistd.h> +#endif // __WINDOWS__ + +#include <mutex> + +#include <event2/event.h> +#include <event2/thread.h> +#include <event2/util.h> + +#include <process/logging.hpp> +#include <process/once.hpp> + +#include <stout/synchronized.hpp> + +#include "event_loop.hpp" +#include "libevent.hpp" + +namespace process { + +event_base* base = nullptr; + + +static std::mutex* functions_mutex = new std::mutex(); +std::queue<lambda::function<void()>>* functions = + new std::queue<lambda::function<void()>>(); + + +thread_local bool* _in_event_loop_ = nullptr; + + +void async_function(evutil_socket_t socket, short which, void* arg) +{ + event* ev = reinterpret_cast<event*>(arg); + event_free(ev); + + std::queue<lambda::function<void()>> q; + + synchronized (functions_mutex) { + std::swap(q, *functions); + } + + while (!q.empty()) { + q.front()(); + q.pop(); + } +} + + +void run_in_event_loop( + const lambda::function<void()>& f, + EventLoopLogicFlow event_loop_logic_flow) +{ + if (__in_event_loop__ && event_loop_logic_flow == ALLOW_SHORT_CIRCUIT) { + f(); + return; + } + + synchronized (functions_mutex) { + functions->push(f); + + // Add an event and activate it to interrupt the event loop. + // TODO(jmlvanre): after libevent v 2.1 we can use + // event_self_cbarg instead of re-assigning the event. For now we + // manually re-assign the event to pass in the pointer to the + // event itself as the callback argument. + event* ev = evtimer_new(base, async_function, nullptr); + + // 'event_assign' is only valid on non-pending AND non-active + // events. This means we have to assign the callback before + // calling 'event_active'. + if (evtimer_assign(ev, base, async_function, ev) < 0) { + LOG(FATAL) << "Failed to assign callback on event"; + } + + event_active(ev, EV_TIMEOUT, 0); + } +} + + +void EventLoop::run() +{ + __in_event_loop__ = true; + + do { + int result = event_base_loop(base, EVLOOP_ONCE); + if (result < 0) { + LOG(FATAL) << "Failed to run event loop"; + } else if (result > 0) { + // All events are handled, continue event loop. + continue; + } else { + CHECK_EQ(0, result); + if (event_base_got_break(base)) { + break; + } else if (event_base_got_exit(base)) { + break; + } + } + } while (true); + + __in_event_loop__ = false; +} + + +void EventLoop::stop() +{ + event_base_loopexit(base, nullptr); +} + + +namespace internal { + +struct Delay +{ + lambda::function<void()> function; + event* timer; +}; + +void handle_delay(evutil_socket_t, short, void* arg) +{ + Delay* delay = reinterpret_cast<Delay*>(arg); + delay->function(); + event_free(delay->timer); + delete delay; +} + +} // namespace internal { + + +void EventLoop::delay( + const Duration& duration, + const lambda::function<void()>& function) +{ + internal::Delay* delay = new internal::Delay(); + delay->timer = evtimer_new(base, &internal::handle_delay, delay); + if (delay->timer == nullptr) { + LOG(FATAL) << "Failed to delay, evtimer_new"; + } + + delay->function = function; + + timeval t{0, 0}; + if (duration > Seconds(0)) { + t = duration.timeval(); + } + + evtimer_add(delay->timer, &t); +} + + +double EventLoop::time() +{ + // We explicitly call `evutil_gettimeofday()` for now to avoid any + // issues that may be introduced by using the cached value provided + // by `event_base_gettimeofday_cached()`. Since a lot of logic in + // libprocess depends on time math, we want to log fatal rather than + // cause logic errors if the time fails. + timeval t; + if (evutil_gettimeofday(&t, nullptr) < 0) { + LOG(FATAL) << "Failed to get time, evutil_gettimeofday"; + } + + return Duration(t).secs(); +} + + +void EventLoop::initialize() +{ + static Once* initialized = new Once(); + + if (initialized->once()) { + return; + } + + // We need to initialize Libevent differently depending on the + // operating system threading support. +#if defined(EVTHREAD_USE_PTHREADS_IMPLEMENTED) + if (evthread_use_pthreads() < 0) { + LOG(FATAL) << "Failed to initialize, evthread_use_pthreads"; + } +#elif defined(EVTHREAD_USE_WINDOWS_THREADS_IMPLEMENTED) + if (evthread_use_windows_threads() < 0) { + LOG(FATAL) << "Failed to initialize, evthread_use_windows_threads"; + } +#else +#error "Libevent must be compiled with either pthread or Windows thread support" +#endif + + base = event_base_new(); + + if (base == nullptr) { + LOG(FATAL) << "Failed to initialize, event_base_new"; + } + + initialized->done(); +} + +} // namespace process { http://git-wip-us.apache.org/repos/asf/mesos/blob/78244b53/3rdparty/libprocess/src/posix/libevent/libevent.hpp ---------------------------------------------------------------------- diff --git a/3rdparty/libprocess/src/posix/libevent/libevent.hpp b/3rdparty/libprocess/src/posix/libevent/libevent.hpp new file mode 100644 index 0000000..2eb9790 --- /dev/null +++ b/3rdparty/libprocess/src/posix/libevent/libevent.hpp @@ -0,0 +1,48 @@ +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License + +#ifndef __LIBEVENT_HPP__ +#define __LIBEVENT_HPP__ + +#include <event2/event.h> + +#include <stout/lambda.hpp> + +namespace process { + +// Event loop. +extern event_base* base; + + +// Per thread bool pointer. We use a pointer to lazily construct the +// actual bool. +extern thread_local bool* _in_event_loop_; + + +#define __in_event_loop__ *(_in_event_loop_ == nullptr ? \ + _in_event_loop_ = new bool(false) : _in_event_loop_) + + +enum EventLoopLogicFlow +{ + ALLOW_SHORT_CIRCUIT, + DISALLOW_SHORT_CIRCUIT +}; + + +void run_in_event_loop( + const lambda::function<void()>& f, + EventLoopLogicFlow event_loop_logic_flow = ALLOW_SHORT_CIRCUIT); + +} // namespace process { + +#endif // __LIBEVENT_HPP__ http://git-wip-us.apache.org/repos/asf/mesos/blob/78244b53/3rdparty/libprocess/src/posix/libevent/libevent_poll.cpp ---------------------------------------------------------------------- diff --git a/3rdparty/libprocess/src/posix/libevent/libevent_poll.cpp b/3rdparty/libprocess/src/posix/libevent/libevent_poll.cpp new file mode 100644 index 0000000..038dde2 --- /dev/null +++ b/3rdparty/libprocess/src/posix/libevent/libevent_poll.cpp @@ -0,0 +1,112 @@ +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License + +#include <event2/event.h> + +#include <memory> + +#include <process/future.hpp> +#include <process/io.hpp> +#include <process/process.hpp> // For process::initialize. + +#include "libevent.hpp" + +namespace process { + +namespace io { +namespace internal { + +struct Poll +{ + Promise<short> promise; + std::shared_ptr<event> ev; +}; + + +void pollCallback(evutil_socket_t, short what, void* arg) +{ + Poll* poll = reinterpret_cast<Poll*>(arg); + + if (poll->promise.future().hasDiscard()) { + poll->promise.discard(); + } else { + // Convert libevent specific EV_READ / EV_WRITE to io::* specific + // values of these enumerations. + short events = + ((what & EV_READ) ? io::READ : 0) | ((what & EV_WRITE) ? io::WRITE : 0); + + poll->promise.set(events); + } + + // Deleting the `poll` also destructs `ev` and hence triggers `event_free`, + // which makes the event non-pending. + delete poll; +} + + +void pollDiscard(const std::weak_ptr<event>& ev, short events) +{ + // Discarding inside the event loop prevents `pollCallback()` from being + // called twice if the future is discarded. + run_in_event_loop([=]() { + std::shared_ptr<event> shared = ev.lock(); + // If `ev` cannot be locked `pollCallback` already ran. If it was locked + // but not pending, `pollCallback` is scheduled to be executed. + if (static_cast<bool>(shared) && + event_pending(shared.get(), events, nullptr)) { + // `event_active` will trigger the `pollCallback` to be executed. + event_active(shared.get(), EV_READ, 0); + } + }); +} + +} // namespace internal { + + +Future<short> poll(int_fd fd, short events) +{ + process::initialize(); + + internal::Poll* poll = new internal::Poll(); + + Future<short> future = poll->promise.future(); + + // Convert io::READ / io::WRITE to libevent specific values of these + // enumerations. + short what = + ((events & io::READ) ? EV_READ : 0) | ((events & io::WRITE) ? EV_WRITE : 0); + + // Bind `event_free` to the destructor of the `ev` shared pointer + // guaranteeing that the event will be freed only once. + poll->ev.reset( + event_new(base, fd, what, &internal::pollCallback, poll), + event_free); + + if (poll->ev == nullptr) { + LOG(FATAL) << "Failed to poll, event_new"; + } + + // Using a `weak_ptr` prevents `ev` to become a dangling pointer if + // the returned future is discarded after the event is triggered. + // The `weak_ptr` needs to be created before `event_add` in case + // the event is ready and the callback is executed before creating + // `ev`. + std::weak_ptr<event> ev(poll->ev); + + event_add(poll->ev.get(), nullptr); + + return future + .onDiscard(lambda::bind(&internal::pollDiscard, ev, what)); +} + +} // namespace io { +} // namespace process { http://git-wip-us.apache.org/repos/asf/mesos/blob/78244b53/3rdparty/libprocess/src/posix/libevent/libevent_ssl_socket.cpp ---------------------------------------------------------------------- diff --git a/3rdparty/libprocess/src/posix/libevent/libevent_ssl_socket.cpp b/3rdparty/libprocess/src/posix/libevent/libevent_ssl_socket.cpp new file mode 100644 index 0000000..436b389 --- /dev/null +++ b/3rdparty/libprocess/src/posix/libevent/libevent_ssl_socket.cpp @@ -0,0 +1,1249 @@ +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License + +#include <event2/buffer.h> +#include <event2/bufferevent_ssl.h> +#include <event2/event.h> +#include <event2/listener.h> +#include <event2/thread.h> +#include <event2/util.h> + +#include <openssl/ssl.h> +#include <openssl/err.h> + +#include <process/queue.hpp> +#include <process/socket.hpp> + +#include <process/ssl/flags.hpp> + +#include <stout/net.hpp> +#include <stout/synchronized.hpp> + +#include <stout/os/close.hpp> +#include <stout/os/dup.hpp> +#include <stout/os/fcntl.hpp> + +#include "libevent.hpp" +#include "libevent_ssl_socket.hpp" +#include "openssl.hpp" +#include "poll_socket.hpp" + +// Locking: +// +// We use the BEV_OPT_THREADSAFE flag when constructing bufferevents +// so that all **functions that are called from the event loop that +// take a bufferevent as a parameter will automatically have the +// lock acquired**. +// +// This means that everywhere that the libevent library does not +// already lock the bev, we need to manually 'synchronize (bev) {'. +// To further complicate matters, due to a deadlock scneario in +// libevent-openssl (v 2.0.21) we currently modify bufferevents using +// continuations in the event loop, but these functions, while run +// from within the event loop, are not passed the 'bev' as a parameter +// and thus MUST use 'synchronized (bev)'. See 'Continuation' comment +// below for more details on why we need to invoke these continuations +// from within the event loop. + +// Continuations via 'run_in_event_loop(...)': +// +// There is a deadlock scenario in libevent-openssl (v 2.0.21) when +// modifying the bufferevent (bev) from another thread (not the event +// loop). To avoid this we run all bufferevent manipulation logic in +// continuations that are executed within the event loop. + +// DISALLOW_SHORT_CIRCUIT: +// +// We disallow short-circuiting in 'run_in_event_loop' due to a bug in +// libevent_openssl with deferred callbacks still being called (still +// in the run queue) even though a bev has been disabled. + +using std::queue; +using std::string; + +// Specialization of 'synchronize' to use bufferevent with the +// 'synchronized' macro. +static Synchronized<bufferevent> synchronize(bufferevent* bev) +{ + return Synchronized<bufferevent>( + bev, + [](bufferevent* bev) { bufferevent_lock(bev); }, + [](bufferevent* bev) { bufferevent_unlock(bev); }); +} + +namespace process { +namespace network { +namespace internal { + +Try<std::shared_ptr<SocketImpl>> LibeventSSLSocketImpl::create(int_fd s) +{ + openssl::initialize(); + + if (!openssl::flags().enabled) { + return Error("SSL is disabled"); + } + + auto socket = std::make_shared<LibeventSSLSocketImpl>(s); + // See comment at 'initialize' declaration for why we call this. + socket->initialize(); + return socket; +} + + +LibeventSSLSocketImpl::~LibeventSSLSocketImpl() +{ + // We defer termination and destruction of all event loop specific + // calls and structures. This is a safety against the socket being + // destroyed before existing event loop calls have completed since + // they require valid data structures (the weak pointer). + // + // Release ownership of the file descriptor so that + // we can defer closing the socket. + int_fd fd = release(); + CHECK(fd >= 0); + + evconnlistener* _listener = listener; + bufferevent* _bev = bev; + std::weak_ptr<LibeventSSLSocketImpl>* _event_loop_handle = event_loop_handle; + + run_in_event_loop( + [_listener, _bev, _event_loop_handle, fd]() { + // Once this lambda is called, it should not be possible for + // more event loop callbacks to be triggered with 'this->bev'. + // This is important because we delete event_loop_handle which + // is the callback argument for any event loop callbacks. + // This lambda is responsible for ensuring 'this->bev' is + // disabled, and cleaning up any remaining state associated + // with the event loop. + + CHECK(__in_event_loop__); + + if (_listener != nullptr) { + evconnlistener_free(_listener); + } + + if (_bev != nullptr) { + // NOTE: Removes all future callbacks using 'this->bev'. + bufferevent_disable(_bev, EV_READ | EV_WRITE); + + SSL* ssl = bufferevent_openssl_get_ssl(_bev); + SSL_free(ssl); + bufferevent_free(_bev); + } + + CHECK_SOME(os::close(fd)) << "Failed to close socket"; + + delete _event_loop_handle; + }, + DISALLOW_SHORT_CIRCUIT); +} + + +void LibeventSSLSocketImpl::initialize() +{ + event_loop_handle = new std::weak_ptr<LibeventSSLSocketImpl>(shared(this)); +} + + +Try<Nothing, SocketError> LibeventSSLSocketImpl::shutdown(int how) +{ + // Nothing to do if this socket was never initialized. + synchronized (lock) { + if (bev == nullptr) { + // If it was not initialized, then there should also be no + // requests. + CHECK(connect_request.get() == nullptr); + CHECK(recv_request.get() == nullptr); + CHECK(send_request.get() == nullptr); + + // We expect this to fail and generate an 'ENOTCONN' failure as + // no connection should exist at this point. + if (::shutdown(s, how) < 0) { + return SocketError(); + } + + return Nothing(); + } + } + + // Extend the life-time of 'this' through the execution of the + // lambda in the event loop. Note: The 'self' needs to be explicitly + // captured because we're not using it in the body of the lambda. We + // can use a 'shared_ptr' because run_in_event_loop is guaranteed to + // execute. + auto self = shared(this); + + run_in_event_loop( + [self]() { + CHECK(__in_event_loop__); + CHECK(self); + + CHECK_NOTNULL(self->bev); + + synchronized (self->bev) { + Owned<RecvRequest> request; + + // Swap the 'recv_request' under the object lock. + synchronized (self->lock) { + std::swap(request, self->recv_request); + } + + // If there is still a pending receive request then close it. + if (request.get() != nullptr) { + request->promise + .set(bufferevent_read(self->bev, request->data, request->size)); + } + + // Workaround for SSL shutdown, see http://www.wangafu.net/~nickm/libevent-book/Ref6a_advanced_bufferevents.html // NOLINT + SSL* ssl = bufferevent_openssl_get_ssl(self->bev); + SSL_set_shutdown(ssl, SSL_RECEIVED_SHUTDOWN); + SSL_shutdown(ssl); + } + }, + DISALLOW_SHORT_CIRCUIT); + + return Nothing(); +} + + +// Only runs in event loop. No locks required. See 'Locking' note at +// top of file. +void LibeventSSLSocketImpl::recv_callback(bufferevent* /*bev*/, void* arg) +{ + CHECK(__in_event_loop__); + + std::weak_ptr<LibeventSSLSocketImpl>* handle = + reinterpret_cast<std::weak_ptr<LibeventSSLSocketImpl>*>(CHECK_NOTNULL(arg)); + + std::shared_ptr<LibeventSSLSocketImpl> impl(handle->lock()); + + // Don't call the 'recv_callback' unless the socket is still valid. + if (impl != nullptr) { + impl->recv_callback(); + } +} + + +// Only runs in event loop. Member function continuation of static +// 'recv_callback'. This function can be called from two places - +// a) `LibeventSSLSocketImpl::recv` when a new Socket::recv is called and there +// is buffer available to read. +// b) `LibeventSSLSocketImpl::recv_callback when libevent calls the deferred +// read callback. +void LibeventSSLSocketImpl::recv_callback() +{ + CHECK(__in_event_loop__); + + Owned<RecvRequest> request; + + const size_t buffer_length = evbuffer_get_length(bufferevent_get_input(bev)); + + // Swap out the request object IFF there is buffer available to read. We check + // this here because it is possible that when the libevent deferred callback + // was called, a Socket::recv context already read the buffer from the event. + // Following sequence is possible: + // a. libevent finds a buffer ready to be read. + // b. libevent queues buffer event to be dispatched. + // c. Socket::recv is called that creates a new request. + // d. Socket::recv finds buffer length > 0. + // e. Socket::recv reads the buffer. + // f. A new request Socket::recv is called which creates a new request. + // g. libevent callback is called for the event queued at step b. + // h. libevent callback finds the length of the buffer as 0 but the request is + // a non-nullptr due to step f. + if (buffer_length > 0 || received_eof) { + synchronized (lock) { + std::swap(request, recv_request); + } + } + + if (request.get() != nullptr) { + if (buffer_length > 0) { + size_t length = bufferevent_read(bev, request->data, request->size); + CHECK(length > 0); + + request->promise.set(length); + } else { + CHECK(received_eof); + request->promise.set(0); + } + } +} + + +// Only runs in event loop. No locks required. See 'Locking' note at +// top of file. +void LibeventSSLSocketImpl::send_callback(bufferevent* /*bev*/, void* arg) +{ + CHECK(__in_event_loop__); + + std::weak_ptr<LibeventSSLSocketImpl>* handle = + reinterpret_cast<std::weak_ptr<LibeventSSLSocketImpl>*>(CHECK_NOTNULL(arg)); + + std::shared_ptr<LibeventSSLSocketImpl> impl(handle->lock()); + + // Don't call the 'send_callback' unless the socket is still valid. + if (impl != nullptr) { + impl->send_callback(); + } +} + + +// Only runs in event loop. Member function continuation of static +// 'recv_callback'. +void LibeventSSLSocketImpl::send_callback() +{ + CHECK(__in_event_loop__); + + Owned<SendRequest> request; + + synchronized (lock) { + std::swap(request, send_request); + } + + if (request.get() != nullptr) { + request->promise.set(request->size); + } +} + + +// Only runs in event loop. No locks required. See 'Locking' note at +// top of file. +void LibeventSSLSocketImpl::event_callback( + bufferevent* /*bev*/, + short events, + void* arg) +{ + CHECK(__in_event_loop__); + + std::weak_ptr<LibeventSSLSocketImpl>* handle = + reinterpret_cast<std::weak_ptr<LibeventSSLSocketImpl>*>(CHECK_NOTNULL(arg)); + + std::shared_ptr<LibeventSSLSocketImpl> impl(handle->lock()); + + // Don't call the 'event_callback' unless the socket is still valid. + if (impl != nullptr) { + impl->event_callback(events); + } +} + + +// Only runs in event loop. Member function continuation of static +// 'recv_callback'. +void LibeventSSLSocketImpl::event_callback(short events) +{ + CHECK(__in_event_loop__); + + // TODO(bmahler): Libevent's invariant is that `events` contains: + // + // (1) one of BEV_EVENT_READING or BEV_EVENT_WRITING to + // indicate whether the event was on the read or write path. + // + // (2) one of BEV_EVENT_EOF, BEV_EVENT_ERROR, BEV_EVENT_TIMEOUT, + // BEV_EVENT_CONNECTED. + // + // (1) allows us to handle read and write errors separately. + // HOWEVER, for SSL bufferevents in 2.0.x, libevent never seems + // to tell us about BEV_EVENT_READING or BEV_EVENT_WRITING, + // which forces us to write incorrect logic here by treating all + // events as affecting both reads and writes. + // + // This has been fixed in 2.1.x: + // 2.1 "What's New": + // https://github.com/libevent/libevent/blob/release-2.1.8-stable/whatsnew-2.1.txt#L333-L335 // NOLINT + // Commit: + // https://github.com/libevent/libevent/commit/f7eb69ace + // + // We should require 2.1.x so that we can correctly distinguish + // between the read and write errors, and not have two code paths + // depending on the libevent version, see MESOS-5999, MESOS-6770. + + Owned<RecvRequest> current_recv_request; + Owned<SendRequest> current_send_request; + Owned<ConnectRequest> current_connect_request; + + if (events & BEV_EVENT_EOF || + events & BEV_EVENT_CONNECTED || + events & BEV_EVENT_ERROR) { + synchronized (lock) { + std::swap(current_recv_request, recv_request); + std::swap(current_send_request, send_request); + std::swap(current_connect_request, connect_request); + } + } + + // First handle EOF, we also look for `BEV_EVENT_ERROR` with + // `EVUTIL_SOCKET_ERROR() == 0` since this occurs as a result + // of a "dirty" SSL shutdown (i.e. TCP close before SSL close) + // or when this socket has been shut down and further sends + // are performed. + // + // TODO(bmahler): We don't expose "dirty" SSL shutdowns as + // recv errors, but perhaps we should? + if (events & BEV_EVENT_EOF || + (events & BEV_EVENT_ERROR && EVUTIL_SOCKET_ERROR() == 0)) { + received_eof = true; + + if (current_recv_request.get() != nullptr) { + // Drain any remaining data from the bufferevent or complete the + // promise with 0 to signify EOF. Because we set `received_eof`, + // subsequent calls to `recv` will return 0 if there is no data + // remaining on the buffer. + if (evbuffer_get_length(bufferevent_get_input(bev)) > 0) { + size_t length = + bufferevent_read( + bev, + current_recv_request->data, + current_recv_request->size); + CHECK(length > 0); + + current_recv_request->promise.set(length); + } else { + current_recv_request->promise.set(0); + } + } + + if (current_send_request.get() != nullptr) { + current_send_request->promise.fail("Failed send: connection closed"); + } + + if (current_connect_request.get() != nullptr) { + SSL* ssl = bufferevent_openssl_get_ssl(CHECK_NOTNULL(bev)); + SSL_free(ssl); + bufferevent_free(CHECK_NOTNULL(bev)); + bev = nullptr; + current_connect_request->promise.fail( + "Failed connect: connection closed"); + } + } else if (events & BEV_EVENT_CONNECTED) { + // We should not have receiving or sending request while still + // connecting. + CHECK(current_recv_request.get() == nullptr); + CHECK(current_send_request.get() == nullptr); + CHECK_NOTNULL(current_connect_request.get()); + + // If we're connecting, then we've succeeded. Time to do + // post-verification. + CHECK_NOTNULL(bev); + + // Do post-validation of connection. + SSL* ssl = bufferevent_openssl_get_ssl(bev); + + Try<Nothing> verify = openssl::verify(ssl, peer_hostname, peer_ip); + if (verify.isError()) { + VLOG(1) << "Failed connect, verification error: " << verify.error(); + SSL_free(ssl); + bufferevent_free(bev); + bev = nullptr; + current_connect_request->promise.fail(verify.error()); + return; + } + + current_connect_request->promise.set(Nothing()); + } else if (events & BEV_EVENT_ERROR) { + CHECK(EVUTIL_SOCKET_ERROR() != 0); + std::ostringstream error_stream; + error_stream << evutil_socket_error_to_string(EVUTIL_SOCKET_ERROR()); + + // If there is a valid error, fail any requests and log the error. + VLOG(1) << "Socket error: " << error_stream.str(); + + if (current_recv_request.get() != nullptr) { + current_recv_request->promise.fail( + "Failed recv, connection error: " + + error_stream.str()); + } + + if (current_send_request.get() != nullptr) { + current_send_request->promise.fail( + "Failed send, connection error: " + + error_stream.str()); + } + + if (current_connect_request.get() != nullptr) { + SSL* ssl = bufferevent_openssl_get_ssl(CHECK_NOTNULL(bev)); + SSL_free(ssl); + bufferevent_free(CHECK_NOTNULL(bev)); + bev = nullptr; + current_connect_request->promise.fail( + "Failed connect, connection error: " + + error_stream.str()); + } + } +} + + +LibeventSSLSocketImpl::LibeventSSLSocketImpl(int_fd _s) + : SocketImpl(_s), + bev(nullptr), + listener(nullptr), + recv_request(nullptr), + send_request(nullptr), + connect_request(nullptr), + event_loop_handle(nullptr) {} + + +LibeventSSLSocketImpl::LibeventSSLSocketImpl( + int_fd _s, + bufferevent* _bev, + Option<string>&& _peer_hostname) + : SocketImpl(_s), + bev(_bev), + listener(nullptr), + recv_request(nullptr), + send_request(nullptr), + connect_request(nullptr), + event_loop_handle(nullptr), + peer_hostname(std::move(_peer_hostname)) {} + + +Future<Nothing> LibeventSSLSocketImpl::connect(const Address& address) +{ + if (bev != nullptr) { + return Failure("Socket is already connected"); + } + + if (connect_request.get() != nullptr) { + return Failure("Socket is already connecting"); + } + + SSL* ssl = SSL_new(openssl::context()); + if (ssl == nullptr) { + return Failure("Failed to connect: SSL_new"); + } + + // Construct the bufferevent in the connecting state. + // We set 'BEV_OPT_DEFER_CALLBACKS' to avoid calling the + // 'event_callback' before 'bufferevent_socket_connect' returns. + CHECK(bev == nullptr); + bev = bufferevent_openssl_socket_new( + base, + s, + ssl, + BUFFEREVENT_SSL_CONNECTING, + BEV_OPT_THREADSAFE | BEV_OPT_DEFER_CALLBACKS); + + if (bev == nullptr) { + // We need to free 'ssl' here because the bev won't clean it up + // for us. + SSL_free(ssl); + return Failure("Failed to connect: bufferevent_openssl_socket_new"); + } + + if (address.family() == Address::Family::INET4 || + address.family() == Address::Family::INET6) { + // Try and determine the 'peer_hostname' from the address we're + // connecting to in order to properly verify the certificate + // later. + const Try<string> hostname = + network::convert<inet::Address>(address)->hostname(); + + if (hostname.isError()) { + VLOG(2) << "Could not determine hostname of peer: " << hostname.error(); + } else { + VLOG(2) << "Connecting to " << hostname.get(); + peer_hostname = hostname.get(); + } + + // Determine the 'peer_ip' from the address we're connecting to in + // order to properly verify the certificate later. + peer_ip = network::convert<inet::Address>(address)->ip; + } + + // Optimistically construct a 'ConnectRequest' and future. + Owned<ConnectRequest> request(new ConnectRequest()); + Future<Nothing> future = request->promise.future(); + + // Assign 'connect_request' under lock, fail on error. + synchronized (lock) { + if (connect_request.get() != nullptr) { + SSL_free(ssl); + bufferevent_free(bev); + bev = nullptr; + return Failure("Socket is already connecting"); + } + std::swap(request, connect_request); + } + + // Extend the life-time of 'this' through the execution of the + // lambda in the event loop. Note: The 'self' needs to be explicitly + // captured because we're not using it in the body of the lambda. We + // can use a 'shared_ptr' because run_in_event_loop is guaranteed to + // execute. + auto self = shared(this); + + run_in_event_loop( + [self, address]() { + sockaddr_storage addr = address; + + // Assign the callbacks for the bufferevent. We do this + // before the 'bufferevent_socket_connect()' call to avoid + // any race on the underlying buffer events becoming ready. + bufferevent_setcb( + self->bev, + &LibeventSSLSocketImpl::recv_callback, + &LibeventSSLSocketImpl::send_callback, + &LibeventSSLSocketImpl::event_callback, + CHECK_NOTNULL(self->event_loop_handle)); + + if (bufferevent_socket_connect( + self->bev, + reinterpret_cast<sockaddr*>(&addr), + address.size()) < 0) { + SSL* ssl = bufferevent_openssl_get_ssl(CHECK_NOTNULL(self->bev)); + SSL_free(ssl); + bufferevent_free(self->bev); + self->bev = nullptr; + + Owned<ConnectRequest> request; + + // Swap out the 'connect_request' so we can destroy it + // outside of the lock. + synchronized (self->lock) { + std::swap(request, self->connect_request); + } + + CHECK_NOTNULL(request.get()); + + // Fail the promise since we failed to connect. + request->promise.fail( + "Failed to connect: bufferevent_socket_connect"); + } + }, + DISALLOW_SHORT_CIRCUIT); + + return future; +} + + +Future<size_t> LibeventSSLSocketImpl::recv(char* data, size_t size) +{ + // Optimistically construct a 'RecvRequest' and future. + Owned<RecvRequest> request(new RecvRequest(data, size)); + std::weak_ptr<LibeventSSLSocketImpl> weak_self(shared(this)); + + // If the user of the future decides to 'discard', then we want to + // test whether the request was already satisfied. + // We capture a 'weak_ptr' to 'this' (as opposed to a 'shared_ptr') + // because the socket could be destroyed before this lambda is + // executed. If we used a 'shared_ptr' then this lambda could extend + // the life-time of 'this' unnecessarily. + Future<size_t> future = request->promise.future() + .onDiscard([weak_self]() { + // Extend the life-time of 'this' through the execution of the + // lambda in the event loop. Note: The 'self' needs to be + // explicitly captured because we're not using it in the body of + // the lambda. We can use a 'shared_ptr' because + // run_in_event_loop is guaranteed to execute. + std::shared_ptr<LibeventSSLSocketImpl> self(weak_self.lock()); + + if (self != nullptr) { + run_in_event_loop( + [self]() { + CHECK(__in_event_loop__); + CHECK(self); + + Owned<RecvRequest> request; + + synchronized (self->lock) { + std::swap(request, self->recv_request); + } + + // Only discard if the request hasn't already been + // satisfied. + if (request.get() != nullptr) { + // Discard the promise outside of the object lock as + // the callbacks can be expensive. + request->promise.discard(); + } + }, + DISALLOW_SHORT_CIRCUIT); + } + }); + + // Assign 'recv_request' under lock, fail on error. + synchronized (lock) { + if (recv_request.get() != nullptr) { + return Failure("Socket is already receiving"); + } + std::swap(request, recv_request); + } + + // Extend the life-time of 'this' through the execution of the + // lambda in the event loop. Note: The 'self' needs to be explicitly + // captured because we're not using it in the body of the lambda. We + // can use a 'shared_ptr' because run_in_event_loop is guaranteed to + // execute. + auto self = shared(this); + + run_in_event_loop( + [self]() { + CHECK(__in_event_loop__); + CHECK(self); + + bool recv = false; + + // We check to see if 'recv_request' is null. It would be null + // if a 'discard' happened before this lambda was executed. + synchronized (self->lock) { + recv = self->recv_request.get() != nullptr; + } + + // Only try to read existing data from the bufferevent if the + // request has not already been discarded. + if (recv) { + synchronized (self->bev) { + evbuffer* input = bufferevent_get_input(self->bev); + size_t length = evbuffer_get_length(input); + + // If there is already data in the buffer or an EOF has + // been received, fulfill the 'recv_request' by calling + // 'recv_callback()'. Otherwise do nothing and wait for + // the 'recv_callback' to run when we receive data over + // the network. + if (length > 0 || self->received_eof) { + self->recv_callback(); + } + } + } + }, + DISALLOW_SHORT_CIRCUIT); + + return future; +} + + +Future<size_t> LibeventSSLSocketImpl::send(const char* data, size_t size) +{ + // Optimistically construct a 'SendRequest' and future. + Owned<SendRequest> request(new SendRequest(size)); + Future<size_t> future = request->promise.future(); + + // We don't add an 'onDiscard' continuation to send because we can + // not accurately detect how many bytes have been sent. Once we pass + // the data to the bufferevent, there is the possibility that parts + // of it have been sent. Another reason is that if we send partial + // messages (discard only a part of the data), then it is likely + // that the receiving end will fail parsing the message. + + // Assign 'send_request' under lock, fail on error. + synchronized (lock) { + if (send_request.get() != nullptr) { + return Failure("Socket is already sending"); + } + std::swap(request, send_request); + } + + evbuffer* buffer = CHECK_NOTNULL(evbuffer_new()); + + int result = evbuffer_add(buffer, data, size); + CHECK_EQ(0, result); + + // Extend the life-time of 'this' through the execution of the + // lambda in the event loop. Note: The 'self' needs to be explicitly + // captured because we're not using it in the body of the lambda. We + // can use a 'shared_ptr' because run_in_event_loop is guaranteed to + // execute. + auto self = shared(this); + + run_in_event_loop( + [self, buffer]() { + CHECK(__in_event_loop__); + CHECK(self); + + // Check if the socket is closed or the write end has + // encountered an error in the interim (i.e. we received + // a BEV_EVENT_ERROR with BEV_EVENT_WRITING). + bool write = false; + + synchronized (self->lock) { + if (self->send_request.get() != nullptr) { + write = true; + } + } + + if (write) { + int result = bufferevent_write_buffer(self->bev, buffer); + CHECK_EQ(0, result); + } + + evbuffer_free(buffer); + }, + DISALLOW_SHORT_CIRCUIT); + + return future; +} + + +Future<size_t> LibeventSSLSocketImpl::sendfile( + int_fd fd, + off_t offset, + size_t size) +{ + // Optimistically construct a 'SendRequest' and future. + Owned<SendRequest> request(new SendRequest(size)); + Future<size_t> future = request->promise.future(); + + // Assign 'send_request' under lock, fail on error. + synchronized (lock) { + if (send_request.get() != nullptr) { + return Failure("Socket is already sending"); + } + std::swap(request, send_request); + } + + // Duplicate the file descriptor because Libevent will take ownership + // and control the lifecycle separately. + // + // TODO(josephw): We can avoid duplicating the file descriptor in + // future versions of Libevent. In Libevent versions 2.1.2 and later, + // we may use `evbuffer_file_segment_new` and `evbuffer_add_file_segment` + // instead of `evbuffer_add_file`. + Try<int_fd> dup = os::dup(fd); + if (dup.isError()) { + return Failure(dup.error()); + } + + // NOTE: This is *not* an `int_fd` because `libevent` requires a CRT + // integer file descriptor, which we allocate and then use + // exclusively here. +#ifdef __WINDOWS__ + int owned_fd = dup->crt(); + // The `os::cloexec` and `os::nonblock` functions do nothing on + // Windows, and cannot be called because they take `int_fd`. +#else + int owned_fd = dup.get(); + + // Set the close-on-exec flag. + Try<Nothing> cloexec = os::cloexec(owned_fd); + if (cloexec.isError()) { + os::close(owned_fd); + return Failure( + "Failed to set close-on-exec on duplicated file descriptor: " + + cloexec.error()); + } + + // Make the file descriptor non-blocking. + Try<Nothing> nonblock = os::nonblock(owned_fd); + if (nonblock.isError()) { + os::close(owned_fd); + return Failure( + "Failed to make duplicated file descriptor non-blocking: " + + nonblock.error()); + } +#endif // __WINDOWS__ + + // Extend the life-time of 'this' through the execution of the + // lambda in the event loop. Note: The 'self' needs to be explicitly + // captured because we're not using it in the body of the lambda. We + // can use a 'shared_ptr' because run_in_event_loop is guaranteed to + // execute. + auto self = shared(this); + + run_in_event_loop( + [self, owned_fd, offset, size]() { + CHECK(__in_event_loop__); + CHECK(self); + + // Check if the socket is closed or the write end has + // encountered an error in the interim (i.e. we received + // a BEV_EVENT_ERROR with BEV_EVENT_WRITING). + bool write = false; + + synchronized (self->lock) { + if (self->send_request.get() != nullptr) { + write = true; + } + } + + if (write) { + // NOTE: `evbuffer_add_file` will take ownership of the file + // descriptor and close it after it has finished reading it. + int result = evbuffer_add_file( + bufferevent_get_output(self->bev), + owned_fd, + offset, + size); + CHECK_EQ(0, result); + } else { +#ifdef __WINDOWS__ + // NOTE: `os::close()` on Windows is not compatible with CRT + // file descriptors, only `HANDLE` and `SOCKET` types. + ::_close(owned_fd); +#else + os::close(owned_fd); +#endif // __WINDOWS__ + } + }, + DISALLOW_SHORT_CIRCUIT); + + return future; +} + + +Try<Nothing> LibeventSSLSocketImpl::listen(int backlog) +{ + if (listener != nullptr) { + return Error("Socket is already listening"); + } + + CHECK(bev == nullptr); + + // NOTE: Accepted sockets are nonblocking by default in libevent, but + // can be set to block via the `LEV_OPT_LEAVE_SOCKETS_BLOCKING` + // flag for `evconnlistener_new`. + listener = evconnlistener_new( + base, + [](evconnlistener* listener, + evutil_socket_t socket, + sockaddr* addr, + int addr_length, + void* arg) { + CHECK(__in_event_loop__); + + std::weak_ptr<LibeventSSLSocketImpl>* handle = + reinterpret_cast<std::weak_ptr<LibeventSSLSocketImpl>*>( + CHECK_NOTNULL(arg)); + + std::shared_ptr<LibeventSSLSocketImpl> impl(handle->lock()); + +#ifndef __WINDOWS__ + // NOTE: Passing the flag `LEV_OPT_CLOSE_ON_EXEC` into + // `evconnlistener_new` would atomically set `SOCK_CLOEXEC` + // on the accepted socket. However, this flag is not supported + // in the minimum recommended version of libevent (2.0.22). + Try<Nothing> cloexec = os::cloexec(socket); + if (cloexec.isError()) { + VLOG(2) << "Failed to accept, cloexec: " << cloexec.error(); + + // Propagate the error through the listener's `accept_queue`. + if (impl != nullptr) { + impl->accept_queue.put( + Failure("Failed to accept, cloexec: " + cloexec.error())); + } + + os::close(socket); + return; + } +#endif // __WINDOWS__ + + if (impl != nullptr) { + Try<net::IP> ip = net::IP::create(*addr); + if (ip.isError()) { + VLOG(2) << "Could not convert sockaddr to net::IP: " << ip.error(); + } + + // We pass the 'listener' into the 'AcceptRequest' because + // this function could be executed before 'this->listener' + // is set. + AcceptRequest* request = + new AcceptRequest( + // NOTE: The `int_fd` must be explicitly constructed + // to avoid the `intptr_t` being casted to an `int`, + // resulting in a `HANDLE` instead of a `SOCKET` on + // Windows. + int_fd(socket), + listener, + ip.isSome() ? Option<net::IP>(ip.get()) : None()); + + impl->accept_callback(request); + } + }, + event_loop_handle, + LEV_OPT_REUSEABLE, + backlog, + s); + + if (listener == nullptr) { + return Error("Failed to listen on socket"); + } + + // TODO(jmlvanre): attach an error callback. + + return Nothing(); +} + + +Future<std::shared_ptr<SocketImpl>> LibeventSSLSocketImpl::accept() +{ + // Note that due to MESOS-8448, when the caller discards, it's + // possible that we pull an accepted socket out of the queue but + // drop it when `.then` transitions to discarded rather than + // executing the continuation. This is currently acceptable since + // callers only discard when they're breaking their accept loop. + // However, from an API perspective, we shouldn't be dropping + // the socket on the floor. + // + // We explicitly specify the return type to avoid a type deduction + // issue in some versions of clang. See MESOS-2943. + return accept_queue.get() + .then([](const Future<std::shared_ptr<SocketImpl>>& impl) + -> Future<std::shared_ptr<SocketImpl>> { + CHECK(!impl.isPending()); + return impl; + }); +} + + +void LibeventSSLSocketImpl::peek_callback( + evutil_socket_t fd, + short what, + void* arg) +{ + CHECK(__in_event_loop__); + + CHECK(what & EV_READ); + char data[6]; + + // Try to peek the first 6 bytes of the message. + ssize_t size = ::recv(fd, data, 6, MSG_PEEK); + + // Based on the function 'ssl23_get_client_hello' in openssl, we + // test whether to dispatch to the SSL or non-SSL based accept based + // on the following rules: + // 1. If there are fewer than 3 bytes: non-SSL. + // 2. If the 1st bit of the 1st byte is set AND the 3rd byte is + // equal to SSL2_MT_CLIENT_HELLO: SSL. + // 3. If the 1st byte is equal to SSL3_RT_HANDSHAKE AND the 2nd + // byte is equal to SSL3_VERSION_MAJOR and the 6th byte is + // equal to SSL3_MT_CLIENT_HELLO: SSL. + // 4. Otherwise: non-SSL. + + // For an ascii based protocol to falsely get dispatched to SSL it + // needs to: + // 1. Start with an invalid ascii character (0x80). + // 2. OR have the first 2 characters be a SYN followed by ETX, and + // then the 6th character be SOH. + // These conditions clearly do not constitute valid HTTP requests, + // and are unlikely to collide with other existing protocols. + + bool ssl = false; // Default to rule 4. + + if (size < 2) { // Rule 1. + ssl = false; + } else if ((data[0] & 0x80) && data[2] == SSL2_MT_CLIENT_HELLO) { // Rule 2. + ssl = true; + } else if (data[0] == SSL3_RT_HANDSHAKE && + data[1] == SSL3_VERSION_MAJOR && + data[5] == SSL3_MT_CLIENT_HELLO) { // Rule 3. + ssl = true; + } + + AcceptRequest* request = reinterpret_cast<AcceptRequest*>(arg); + + // We call 'event_free()' here because it ensures the event is made + // non-pending and inactive before it gets deallocated. + event_free(request->peek_event); + request->peek_event = nullptr; + + if (ssl) { + accept_SSL_callback(request); + } else { + // Downgrade to a non-SSL socket implementation. + // + // NOTE: The `int_fd` must be explicitly constructed to avoid the + // `intptr_t` being casted to an `int`, resulting in a `HANDLE` + // instead of a `SOCKET` on Windows. + Try<std::shared_ptr<SocketImpl>> impl = PollSocketImpl::create(int_fd(fd)); + if (impl.isError()) { + request->promise.fail(impl.error()); + } else { + request->promise.set(impl.get()); + } + + delete request; + } +} + + +void LibeventSSLSocketImpl::accept_callback(AcceptRequest* request) +{ + CHECK(__in_event_loop__); + + Queue<Future<std::shared_ptr<SocketImpl>>> accept_queue_ = accept_queue; + + // After the socket is accepted, it must complete the SSL + // handshake (or be downgraded to a regular socket) before + // we put it in the queue of connected sockets. + request->promise.future() + .onAny([accept_queue_](Future<std::shared_ptr<SocketImpl>> impl) mutable { + accept_queue_.put(impl); + }); + + // If we support downgrading the connection, first wait for this + // socket to become readable. We will then MSG_PEEK it to test + // whether we want to dispatch as SSL or non-SSL. + if (openssl::flags().support_downgrade) { + request->peek_event = event_new( + base, + request->socket, + EV_READ, + &LibeventSSLSocketImpl::peek_callback, + request); + event_add(request->peek_event, nullptr); + } else { + accept_SSL_callback(request); + } +} + + +void LibeventSSLSocketImpl::accept_SSL_callback(AcceptRequest* request) +{ + CHECK(__in_event_loop__); + + // Set up SSL object. + SSL* ssl = SSL_new(openssl::context()); + if (ssl == nullptr) { + request->promise.fail("Accept failed, SSL_new"); + delete request; + return; + } + + // We use 'request->listener' because 'this->listener' may not have + // been set by the time this function is executed. See comment in + // the lambda for evconnlistener_new in + // 'LibeventSSLSocketImpl::listen'. + event_base* ev_base = evconnlistener_get_base(request->listener); + + // Construct the bufferevent in the accepting state. + bufferevent* bev = bufferevent_openssl_socket_new( + ev_base, + request->socket, + ssl, + BUFFEREVENT_SSL_ACCEPTING, + BEV_OPT_THREADSAFE); + + if (bev == nullptr) { + request->promise.fail("Accept failed: bufferevent_openssl_socket_new"); + SSL_free(ssl); + delete request; + return; + } + + bufferevent_setcb( + bev, + nullptr, + nullptr, + [](bufferevent* bev, short events, void* arg) { + // This handles error states or 'BEV_EVENT_CONNECTED' events + // and satisfies the promise by constructing a new socket if + // the connection was successfuly established. + CHECK(__in_event_loop__); + + AcceptRequest* request = + reinterpret_cast<AcceptRequest*>(CHECK_NOTNULL(arg)); + + if (events & BEV_EVENT_EOF) { + request->promise.fail("Failed accept: connection closed"); + } else if (events & BEV_EVENT_CONNECTED) { + // We will receive a 'CONNECTED' state on an accepting socket + // once the connection is established. Time to do + // post-verification. First, we need to determine the peer + // hostname. + Option<string> peer_hostname = None(); + + if (request->ip.isSome()) { + Try<string> hostname = net::getHostname(request->ip.get()); + + if (hostname.isError()) { + VLOG(2) << "Could not determine hostname of peer: " + << hostname.error(); + } else { + VLOG(2) << "Accepting from " << hostname.get(); + peer_hostname = hostname.get(); + } + } + + SSL* ssl = bufferevent_openssl_get_ssl(bev); + CHECK_NOTNULL(ssl); + + Try<Nothing> verify = + openssl::verify(ssl, peer_hostname, request->ip); + + if (verify.isError()) { + VLOG(1) << "Failed accept, verification error: " << verify.error(); + request->promise.fail(verify.error()); + SSL_free(ssl); + bufferevent_free(bev); + // TODO(jmlvanre): Clean up for readability. Consider RAII + // or constructing the impl earlier. + CHECK(request->socket >= 0); + Try<Nothing> close = os::close(request->socket); + if (close.isError()) { + LOG(FATAL) + << "Failed to close socket " << stringify(request->socket) + << ": " << close.error(); + } + delete request; + return; + } + + auto impl = std::shared_ptr<LibeventSSLSocketImpl>( + new LibeventSSLSocketImpl( + request->socket, + bev, + std::move(peer_hostname))); + + // See comment at 'initialize' declaration for why we call + // this. + impl->initialize(); + + // We have to wait till after 'initialize()' is invoked for + // event_loop_handle to be valid as a callback argument for + // the callbacks. + bufferevent_setcb( + CHECK_NOTNULL(impl->bev), + &LibeventSSLSocketImpl::recv_callback, + &LibeventSSLSocketImpl::send_callback, + &LibeventSSLSocketImpl::event_callback, + CHECK_NOTNULL(impl->event_loop_handle)); + + request->promise.set(std::dynamic_pointer_cast<SocketImpl>(impl)); + } else if (events & BEV_EVENT_ERROR) { + std::ostringstream stream; + if (EVUTIL_SOCKET_ERROR() != 0) { + stream << evutil_socket_error_to_string(EVUTIL_SOCKET_ERROR()); + } else { + char buffer[1024] = {}; + unsigned long error = bufferevent_get_openssl_error(bev); + ERR_error_string_n(error, buffer, sizeof(buffer)); + stream << buffer; + } + + // Fail the accept request and log the error. + VLOG(1) << "Socket error: " << stream.str(); + + SSL* ssl = bufferevent_openssl_get_ssl(CHECK_NOTNULL(bev)); + SSL_free(ssl); + bufferevent_free(bev); + + // TODO(jmlvanre): Clean up for readability. Consider RAII + // or constructing the impl earlier. + CHECK(request->socket >= 0); + Try<Nothing> close = os::close(request->socket); + if (close.isError()) { + LOG(FATAL) + << "Failed to close socket " << stringify(request->socket) + << ": " << close.error(); + } + request->promise.fail( + "Failed accept: connection error: " + stream.str()); + } + + delete request; + }, + request); +} + +} // namespace internal { +} // namespace network { +} // namespace process { http://git-wip-us.apache.org/repos/asf/mesos/blob/78244b53/3rdparty/libprocess/src/posix/libevent/libevent_ssl_socket.hpp ---------------------------------------------------------------------- diff --git a/3rdparty/libprocess/src/posix/libevent/libevent_ssl_socket.hpp b/3rdparty/libprocess/src/posix/libevent/libevent_ssl_socket.hpp new file mode 100644 index 0000000..6ef5a86 --- /dev/null +++ b/3rdparty/libprocess/src/posix/libevent/libevent_ssl_socket.hpp @@ -0,0 +1,198 @@ +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License + +#ifndef __LIBEVENT_SSL_SOCKET_HPP__ +#define __LIBEVENT_SSL_SOCKET_HPP__ + +#include <event2/buffer.h> +#include <event2/bufferevent_ssl.h> +#include <event2/event.h> +#include <event2/listener.h> +#include <event2/util.h> + +#include <atomic> +#include <memory> + +#include <process/queue.hpp> +#include <process/socket.hpp> + +namespace process { +namespace network { +namespace internal { + +class LibeventSSLSocketImpl : public SocketImpl +{ +public: + // See 'Socket::create()'. + static Try<std::shared_ptr<SocketImpl>> create(int_fd s); + + LibeventSSLSocketImpl(int_fd _s); + + ~LibeventSSLSocketImpl() override; + + // Implement 'SocketImpl' interface. + Future<Nothing> connect(const Address& address) override; + Future<size_t> recv(char* data, size_t size) override; + // Send does not currently support discard. See implementation. + Future<size_t> send(const char* data, size_t size) override; + Future<size_t> sendfile(int_fd fd, off_t offset, size_t size) override; + Try<Nothing> listen(int backlog) override; + Future<std::shared_ptr<SocketImpl>> accept() override; + SocketImpl::Kind kind() const override { return SocketImpl::Kind::SSL; } + + // Shuts down the socket. + // + // NOTE: Although this method accepts an integer which specifies the + // shutdown mode, this parameter is ignored because SSL connections + // do not have a concept of read/write-only shutdown. If either end + // of the socket is closed, then the futures of any outstanding read + // requests will be completed (possibly as failures). + Try<Nothing, SocketError> shutdown(int how) override; + + // We need a post-initializer because 'shared_from_this()' is not + // valid until the constructor has finished. + void initialize(); + +private: + // A set of helper functions that transitions an accepted socket to + // an SSL connected socket. With the libevent-openssl library, once + // we return from the 'accept_callback()' which is scheduled by + // 'listen' then we still need to wait for the 'BEV_EVENT_CONNECTED' + // state before we know the SSL connection has been established. + struct AcceptRequest + { + AcceptRequest( + int_fd _socket, + evconnlistener* _listener, + const Option<net::IP>& _ip) + : peek_event(nullptr), + listener(_listener), + socket(_socket), + ip(_ip) {} + event* peek_event; + Promise<std::shared_ptr<SocketImpl>> promise; + evconnlistener* listener; + int_fd socket; + Option<net::IP> ip; + }; + + struct RecvRequest + { + RecvRequest(char* _data, size_t _size) + : data(_data), size(_size) {} + Promise<size_t> promise; + char* data; + size_t size; + }; + + struct SendRequest + { + SendRequest(size_t _size) + : size(_size) {} + Promise<size_t> promise; + size_t size; + }; + + struct ConnectRequest + { + Promise<Nothing> promise; + }; + + // This is a private constructor used by the accept helper + // functions. + LibeventSSLSocketImpl( + int_fd _s, + bufferevent* bev, + Option<std::string>&& peer_hostname); + + // This is called when the equivalent of 'accept' returns. The role + // of this function is to set up the SSL object and bev. If we + // support both SSL and non-SSL traffic simultaneously then we first + // wait for data to be ready and test the hello handshake to + // disambiguate between the kinds of traffic. + void accept_callback(AcceptRequest* request); + + // This is the continuation of 'accept_callback' that handles an SSL + // connection. + static void accept_SSL_callback(AcceptRequest* request); + + // This function peeks at the data on an accepted socket to see if + // there is an SSL handshake or not. It then dispatches to the + // SSL handling function or creates a non-SSL socket. + static void peek_callback(evutil_socket_t fd, short what, void* arg); + + // The following are function pairs of static functions to member + // functions. The static functions test and hold the weak pointer to + // the socket before calling the member functions. This protects + // against the socket being destroyed before the event loop calls + // the callbacks. + static void recv_callback(bufferevent* bev, void* arg); + void recv_callback(); + + static void send_callback(bufferevent* bev, void* arg); + void send_callback(); + + static void event_callback(bufferevent* bev, short events, void* arg); + void event_callback(short events); + + bufferevent* bev; + + evconnlistener* listener; + + // Protects the following instance variables. + std::atomic_flag lock = ATOMIC_FLAG_INIT; + Owned<RecvRequest> recv_request; + Owned<SendRequest> send_request; + Owned<ConnectRequest> connect_request; + + // Indicates whether or not an EOF has been received on this socket. + // Our accesses to this member are not synchronized because they all + // occur within the event loop, which runs on a single thread. + bool received_eof = false; + + // This is a weak pointer to 'this', i.e., ourselves, this class + // instance. We need this for our event loop callbacks because it's + // possible that we'll actually want to cleanup this socket impl + // before the event loop callback gets executed ... and we'll check + // in each event loop callback whether or not this weak_ptr is valid + // by attempting to upgrade it to shared_ptr. It is the + // responsibility of the event loop through the deferred lambda in + // the destructor to clean up this pointer. + // 1) It is a 'weak_ptr' as opposed to a 'shared_ptr' because we + // want to test whether the object is still around from within the + // event loop. If it was a 'shared_ptr' then we would be + // contributing to the lifetime of the object and would no longer be + // able to test the lifetime correctly. + // 2) This is a pointer to a 'weak_ptr' so that we can pass this + // through to the event loop through the C-interface. We need access + // to the 'weak_ptr' from outside the object (in the event loop) to + // test if the object is still alive. By maintaining this 'weak_ptr' + // on the heap we can be sure it is safe to access from the + // event loop until it is destroyed. + std::weak_ptr<LibeventSSLSocketImpl>* event_loop_handle; + + // This queue stores accepted sockets that are considered connected + // (either the SSL handshake has completed or the socket has been + // downgraded). The 'accept()' call returns sockets from this queue. + // We wrap the socket in a 'Future' so that we can pass failures or + // discards through. + Queue<Future<std::shared_ptr<SocketImpl>>> accept_queue; + + Option<std::string> peer_hostname; + Option<net::IP> peer_ip; +}; + +} // namespace internal { +} // namespace network { +} // namespace process { + +#endif // __LIBEVENT_SSL_SOCKET_HPP__