net/loolnb.cpp | 250 ++++++++++++++++------------------------------------ net/socket.hpp | 273 +++++++++++++++++++++++++++++++-------------------------- 2 files changed, 231 insertions(+), 292 deletions(-)
New commits: commit ae35938a9dac2fe632b32b072a9cb5469bdbbdb7 Author: Michael Meeks <michael.me...@collabora.com> Date: Wed Feb 15 14:48:48 2017 +0000 De-templatize and simplify. diff --git a/net/loolnb.cpp b/net/loolnb.cpp index da7b17a..bffc684 100644 --- a/net/loolnb.cpp +++ b/net/loolnb.cpp @@ -52,147 +52,6 @@ public: } }; -/// Handles non-blocking socket event polling. -/// Only polls on N-Sockets and invokes callback and -/// doesn't manage buffers or client data. -/// Note: uses poll(2) since it has very good performance -/// compared to epoll up to a few hundred sockets and -/// doesn't suffer select(2)'s poor API. Since this will -/// be used per-document we don't expect to have several -/// hundred users on same document to suffer poll(2)'s -/// scalability limit. Meanwhile, epoll(2)'s high -/// overhead to adding/removing sockets is not helpful. -template <typename T> -class SocketPoll -{ -public: - SocketPoll() - { - // Create the wakeup fd. - if (::pipe2(_wakeup, O_CLOEXEC | O_NONBLOCK) == -1) - { - // FIXME: Can't have wakeup pipe, should we exit? - // FIXME: running out of sockets should be a case we handle elegantly here - and also in our accept / ClientSocket creation I guess. - _wakeup[0] = -1; - _wakeup[1] = -1; - } - } - - ~SocketPoll() - { - ::close(_wakeup[0]); - ::close(_wakeup[1]); - } - - /// Poll the sockets for available data to read or buffer to write. - void poll(const int timeoutMs, const std::function<bool(const std::shared_ptr<T>&, const int)>& handler) - { - const size_t size = _pollSockets.size(); - - // The events to poll on change each spin of the loop. - setupPollFds(); - - int rc; - do - { - rc = ::poll(&_pollFds[0], size + 1, timeoutMs); - } - while (rc < 0 && errno == EINTR); - - // Fire the callback and remove dead fds. - for (int i = static_cast<int>(size) - 1; i >= 0; --i) - { - if (_pollFds[i].revents) - { - if (!handler(_pollSockets[i], _pollFds[i].revents)) - { - std::cout << "Removing: " << _pollFds[i].fd << std::endl; - _pollSockets.erase(_pollSockets.begin() + i); - // Don't remove from pollFds; we'll recreate below. - } - } - } - - // Process the wakeup pipe (always the last entry). - if (_pollFds[size].revents) - { - // Add new sockets first. - addNewSocketsToPoll(); - - // Clear the data. - int dump; - if (::read(_wakeup[0], &dump, sizeof(dump)) == -1) - { - // Nothing to do. - } - } - } - - /// Insert a new socket to be polled. - /// Sockets are removed only when the handler return false. - void insertNewSocket(const std::shared_ptr<T>& newSocket) - { - std::lock_guard<std::mutex> lock(_mutex); - - _newSockets.emplace_back(newSocket); - - // wakeup the main-loop. - if (::write(_wakeup[1], "w", 1) == -1) - { - // wakeup pipe is already full. - assert(errno == EAGAIN || errno == EWOULDBLOCK); - } - } - -private: - - /// Add the new sockets to list of those to poll. - void addNewSocketsToPoll() - { - std::lock_guard<std::mutex> lock(_mutex); - - // Copy the new sockets over and clear. - _pollSockets.insert(_pollSockets.end(), _newSockets.begin(), _newSockets.end()); - _newSockets.clear(); - } - - void removeSocketFromPoll(const std::shared_ptr<T>& socket) - { - _pollSockets.erase(_pollSockets.find(socket)); - } - - /// Initialize the poll fds array with the right events - void setupPollFds() - { - const size_t size = _pollSockets.size(); - - _pollFds.resize(size + 1); // + wakeup pipe - - for (size_t i = 0; i < size; ++i) - { - _pollFds[i].fd = _pollSockets[i]->getFD(); - _pollFds[i].events = _pollSockets[i]->getPollEvents(); - _pollFds[i].revents = 0; - } - - // Add the read-end of the wake pipe. - _pollFds[size].fd = _wakeup[0]; - _pollFds[size].events = POLLIN; - _pollFds[size].revents = 0; - } - -private: - /// main-loop wakeup pipe - int _wakeup[2]; - /// The sockets we're controlling - std::vector<std::shared_ptr<T>> _pollSockets; - /// Protects _newSockets - std::mutex _mutex; - std::vector<std::shared_ptr<T>> _newSockets; - /// The fds to poll. - std::vector<pollfd> _pollFds; -}; - /// Generic thread class. class Thread { @@ -229,10 +88,78 @@ private: Poco::Net::SocketAddress addr("127.0.0.1", PortNumber); -void server(SocketPoll<SimpleResponseClient>& poller) +/// A non-blocking, streaming socket. +class ServerSocket : public Socket +{ + SocketPoll& _clientPoller; +public: + ServerSocket(SocketPoll& clientPoller) + : _clientPoller(clientPoller) + { + } + + /// Binds to a local address (Servers only). + /// Does not retry on error. + /// Returns true on success only. + bool bind(const Poco::Net::SocketAddress& address) + { + // Enable address reuse to avoid stalling after + // recycling, when previous socket is TIME_WAIT. + //TODO: Might be worth refactoring out. + const int reuseAddress = 1; + constexpr unsigned int len = sizeof(reuseAddress); + ::setsockopt(getFD(), SOL_SOCKET, SO_REUSEADDR, &reuseAddress, len); + + const int rc = ::bind(getFD(), address.addr(), address.length()); + return (rc == 0); + } + + /// Listen to incoming connections (Servers only). + /// Does not retry on error. + /// Returns true on success only. + bool listen(const int backlog = 64) + { + const int rc = ::listen(getFD(), backlog); + return (rc == 0); + } + + /// Accepts an incoming connection (Servers only). + /// Does not retry on error. + /// Returns a valid Socket shared_ptr on success only. + template <typename T> + std::shared_ptr<T> accept() + { + // Accept a connection (if any) and set it to non-blocking. + // We don't care about the client's address, so ignored. + const int rc = ::accept4(getFD(), nullptr, nullptr, SOCK_NONBLOCK); + return std::shared_ptr<T>(rc != -1 ? new T(rc) : nullptr); + } + + int getPollEvents() override + { + return POLLIN; + } + + HandleResult handlePoll( int /* events */ ) override + { + std::shared_ptr<SimpleResponseClient> clientSocket = accept<SimpleResponseClient>(); + if (!clientSocket) + { + const std::string msg = "Failed to accept. (errno: "; + throw std::runtime_error(msg + std::strerror(errno) + ")"); + } + + std::cout << "Accepted client #" << clientSocket->getFD() << std::endl; + _clientPoller.insertNewSocket(clientSocket); + + return Socket::HandleResult::CONTINUE; + } +}; + +void server(SocketPoll& clientPoller) { // Start server. - auto server = std::make_shared<ServerSocket>(); + auto server = std::make_shared<ServerSocket>(clientPoller); if (!server->bind(addr)) { const std::string msg = "Failed to bind. (errno: "; @@ -245,51 +172,30 @@ void server(SocketPoll<SimpleResponseClient>& poller) throw std::runtime_error(msg + std::strerror(errno) + ")"); } + SocketPoll serverPoll; + + serverPoll.insertNewSocket(server); + std::cout << "Listening." << std::endl; for (;;) { - if (server->pollRead(30000)) - { - std::shared_ptr<SimpleResponseClient> clientSocket = server->accept<SimpleResponseClient>(); - if (!clientSocket) - { - const std::string msg = "Failed to accept. (errno: "; - throw std::runtime_error(msg + std::strerror(errno) + ")"); - } - - std::cout << "Accepted client #" << clientSocket->getFD() << std::endl; - poller.insertNewSocket(clientSocket); - } + serverPoll.poll(30000); } } /// Poll client sockets and do IO. -void pollAndComm(SocketPoll<SimpleResponseClient>& poller, std::atomic<bool>& stop) +void pollAndComm(SocketPoll& poller, std::atomic<bool>& stop) { while (!stop) { - poller.poll(5000, [](const std::shared_ptr<SimpleResponseClient>& socket, const int events) - { - bool closeSocket = false; - - if (events & POLLIN) - closeSocket = !socket->readIncomingData(); - - if (events & POLLOUT) - socket->writeOutgoingData(); - - if (events & (POLLHUP | POLLERR | POLLNVAL)) - closeSocket = true; - - return !closeSocket; - }); + poller.poll(5000); } } int main(int, const char**) { // Used to poll client sockets. - SocketPoll<SimpleResponseClient> poller; + SocketPoll poller; // Start the client polling thread. Thread threadPoll([&poller](std::atomic<bool>& stop) diff --git a/net/socket.hpp b/net/socket.hpp index 17a3d9d..449f6e0 100644 --- a/net/socket.hpp +++ b/net/socket.hpp @@ -29,7 +29,7 @@ public: { } - ~Socket() + virtual ~Socket() { //TODO: Should we shutdown here or up to the client? @@ -40,6 +40,14 @@ public: // Returns the OS native socket fd. int getFD() const { return _fd; } + /// Return a mask of events we should be polling for + virtual int getPollEvents() = 0; + + /// Handle results of events returned from poll + enum HandleResult { CONTINUE, SOCKET_CLOSED }; + virtual HandleResult handlePoll( int events ) = 0; + + /// Sets the send buffer in size bytes. /// Must be called before accept or connect. /// Note: TCP will allocate twice this size for admin purposes, @@ -106,88 +114,164 @@ public: return rc; } - /// Poll the socket for either read, write, or both. - /// Returns -1 on failure/error (query socket error), 0 for timeout, - /// otherwise, depending on events, the respective bits set. - int poll(const int timeoutMs, const int events = POLLIN | POLLOUT) +protected: + + /// Construct based on an existing socket fd. + /// Used by accept() only. + Socket(const int fd) : + _fd(fd) { - // Use poll(2) as it has lower overhead for up to - // a few hundred sockets compared to epoll(2). - // Also it has a more intuitive API and portable. - pollfd pollfd; - memset(&pollfd, 0, sizeof(pollfd)); + } - pollfd.fd = getFD(); - pollfd.events |= events; +private: + const int _fd; +}; - int rc; - do + +/// Handles non-blocking socket event polling. +/// Only polls on N-Sockets and invokes callback and +/// doesn't manage buffers or client data. +/// Note: uses poll(2) since it has very good performance +/// compared to epoll up to a few hundred sockets and +/// doesn't suffer select(2)'s poor API. Since this will +/// be used per-document we don't expect to have several +/// hundred users on same document to suffer poll(2)'s +/// scalability limit. Meanwhile, epoll(2)'s high +/// overhead to adding/removing sockets is not helpful. +class SocketPoll +{ +public: + SocketPoll() + { + // Create the wakeup fd. + if (::pipe2(_wakeup, O_CLOEXEC | O_NONBLOCK) == -1) { - // Technically, on retrying we should wait - // the _remaining_ time, alas simplicity wins. - rc = ::poll(&pollfd, 1, timeoutMs); + // FIXME: Can't have wakeup pipe, should we exit? + // FIXME: running out of sockets should be a case we handle elegantly here - and also in our accept / ClientSocket creation I guess. + _wakeup[0] = -1; + _wakeup[1] = -1; } - while (rc < 0 && errno == EINTR); + } + + ~SocketPoll() + { + ::close(_wakeup[0]); + ::close(_wakeup[1]); + } + + /// Poll the sockets for available data to read or buffer to write. + void poll(const int timeoutMs) + { + const size_t size = _pollSockets.size(); - if (rc <= 0) + // The events to poll on change each spin of the loop. + setupPollFds(); + + int rc; + do { - return rc; + rc = ::poll(&_pollFds[0], size + 1, timeoutMs); } + while (rc < 0 && errno == EINTR); - int revents = 0; - if (rc == 1) + // Fire the callback and remove dead fds. + for (int i = static_cast<int>(size) - 1; i >= 0; --i) { - if (pollfd.revents & (POLLERR|POLLHUP|POLLNVAL)) + if (_pollFds[i].revents) { - // Probe socket for error. - return -1; + if (_pollSockets[i]->handlePoll(_pollFds[i].revents) == + Socket::HandleResult::SOCKET_CLOSED) + { + std::cout << "Removing: " << _pollFds[i].fd << std::endl; + _pollSockets.erase(_pollSockets.begin() + i); + // Don't remove from pollFds; we'll recreate below. + } } + } - if (pollfd.revents & (POLLIN|POLLPRI)) - { - // Data ready to read. - revents |= POLLIN; - } + // Process the wakeup pipe (always the last entry). + if (_pollFds[size].revents) + { + // Add new sockets first. + addNewSocketsToPoll(); - if (pollfd.revents & POLLOUT) + // Clear the data. + int dump; + if (::read(_wakeup[0], &dump, sizeof(dump)) == -1) { - // Ready for write. - revents |= POLLOUT; + // Nothing to do. } } - - return revents; } - /// Poll the socket for readability. - /// Returns true when there is data to read, otherwise false. - bool pollRead(const int timeoutMs) + /// Insert a new socket to be polled. + /// Sockets are removed only when the handler return false. + void insertNewSocket(const std::shared_ptr<Socket>& newSocket) { - const int rc = poll(timeoutMs, POLLIN); - return (rc > 0 && (rc & POLLIN)); + std::lock_guard<std::mutex> lock(_mutex); + + _newSockets.emplace_back(newSocket); + + // wakeup the main-loop. + if (::write(_wakeup[1], "w", 1) == -1) + { + // wakeup pipe is already full. + assert(errno == EAGAIN || errno == EWOULDBLOCK); + } } - /// Poll the socket for writability. - /// Returns true when socket is ready for writing, otherwise false. - bool pollWrite(const int timeoutMs) +private: + + /// Add the new sockets to list of those to poll. + void addNewSocketsToPoll() { - const int rc = poll(timeoutMs, POLLOUT); - return (rc > 0 && (rc & POLLOUT)); + std::lock_guard<std::mutex> lock(_mutex); + + // Copy the new sockets over and clear. + _pollSockets.insert(_pollSockets.end(), _newSockets.begin(), _newSockets.end()); + _newSockets.clear(); } -protected: + void removeSocketFromPoll(const std::shared_ptr<Socket>& socket) + { + auto it = std::find(_pollSockets.begin(), _pollSockets.end(), socket); + assert (it != _pollSockets.end()); + _pollSockets.erase(it); + } - /// Construct based on an existing socket fd. - /// Used by accept() only. - Socket(const int fd) : - _fd(fd) + /// Initialize the poll fds array with the right events + void setupPollFds() { + const size_t size = _pollSockets.size(); + + _pollFds.resize(size + 1); // + wakeup pipe + + for (size_t i = 0; i < size; ++i) + { + _pollFds[i].fd = _pollSockets[i]->getFD(); + _pollFds[i].events = _pollSockets[i]->getPollEvents(); + _pollFds[i].revents = 0; + } + + // Add the read-end of the wake pipe. + _pollFds[size].fd = _wakeup[0]; + _pollFds[size].events = POLLIN; + _pollFds[size].revents = 0; } private: - const int _fd; + /// main-loop wakeup pipe + int _wakeup[2]; + /// The sockets we're controlling + std::vector<std::shared_ptr<Socket>> _pollSockets; + /// Protects _newSockets + std::mutex _mutex; + std::vector<std::shared_ptr<Socket>> _newSockets; + /// The fds to poll. + std::vector<pollfd> _pollFds; }; + /// A non-blocking, client socket. class ClientSocket : public Socket { @@ -197,36 +281,28 @@ public: { } - /// Connect to a server address. - /// Does not retry on error. - /// timeoutMs can be 0 to avoid waiting, or -1 to wait forever. - /// Returns true on success only. - /// Note: when succceeds, caller must check for - /// EINPROGRESS and poll for write, then getError(), - /// only when the latter returns 0 we are connected. - bool connect(const Poco::Net::SocketAddress& address, const int timeoutMs = 0) + protected: + std::vector< unsigned char > _inBuffer; + std::vector< unsigned char > _outBuffer; + public: + + HandleResult handlePoll( int events ) override { - const int rc = ::connect(getFD(), address.addr(), address.length()); - if (rc == 0) - { - return true; - } + bool closeSocket = false; - if (errno != EINPROGRESS) - { - return false; - } + if (events & POLLIN) + closeSocket = !readIncomingData(); - // Wait for writable, then check again. - pollWrite(timeoutMs); + if (events & POLLOUT) + writeOutgoingData(); - // Now check if we connected, not, or not yet. - return (getError() == 0 || errno == EINPROGRESS); + if (events & (POLLHUP | POLLERR | POLLNVAL)) + closeSocket = true; + + return closeSocket ? HandleResult::SOCKET_CLOSED : + HandleResult::CONTINUE; } - protected: - std::vector< unsigned char > _inBuffer; - std::vector< unsigned char > _outBuffer; - public: + bool readIncomingData() { ssize_t len; @@ -260,7 +336,7 @@ public: // else poll will handle errors } - int getPollEvents() + int getPollEvents() override { int pollFor = POLLIN; if (_outBuffer.size() > 0) @@ -279,47 +355,4 @@ protected: friend class ServerSocket; }; -/// A non-blocking, streaming socket. -class ServerSocket : public Socket -{ -public: - - /// Binds to a local address (Servers only). - /// Does not retry on error. - /// Returns true on success only. - bool bind(const Poco::Net::SocketAddress& address) - { - // Enable address reuse to avoid stalling after - // recycling, when previous socket is TIME_WAIT. - //TODO: Might be worth refactoring out. - const int reuseAddress = 1; - constexpr unsigned int len = sizeof(reuseAddress); - ::setsockopt(getFD(), SOL_SOCKET, SO_REUSEADDR, &reuseAddress, len); - - const int rc = ::bind(getFD(), address.addr(), address.length()); - return (rc == 0); - } - - /// Listen to incoming connections (Servers only). - /// Does not retry on error. - /// Returns true on success only. - bool listen(const int backlog = 64) - { - const int rc = ::listen(getFD(), backlog); - return (rc == 0); - } - - /// Accepts an incoming connection (Servers only). - /// Does not retry on error. - /// Returns a valid Socket shared_ptr on success only. - template <typename T> - std::shared_ptr<T> accept() - { - // Accept a connection (if any) and set it to non-blocking. - // We don't care about the client's address, so ignored. - const int rc = ::accept4(getFD(), nullptr, nullptr, SOCK_NONBLOCK); - return std::shared_ptr<T>(rc != -1 ? new T(rc) : nullptr); - } -}; - /* vim:set shiftwidth=4 softtabstop=4 expandtab: */ _______________________________________________ Libreoffice-commits mailing list libreoffice-comm...@lists.freedesktop.org https://lists.freedesktop.org/mailman/listinfo/libreoffice-commits