net/Socket.hpp | 64 +++++++++++++++++++++++++++++++++------------------------ 1 file changed, 38 insertions(+), 26 deletions(-)
New commits: commit 6a34272fdd10b2c4a5ba2ad6c7fb8a9e56e3dc6a Author: Michael Meeks <michael.me...@collabora.com> Date: Fri Feb 24 23:14:09 2017 +0000 Add thread-safe cross-thread callbacks, split out wakeup and simplify diff --git a/net/Socket.hpp b/net/Socket.hpp index d78e3c9..4b49b08 100644 --- a/net/Socket.hpp +++ b/net/Socket.hpp @@ -217,49 +217,60 @@ public: // Process the wakeup pipe (always the last entry). if (_pollFds[size].revents) { - // Add new sockets first. - addNewSocketsToPoll(); + std::vector<CallbackFn> invoke; + { + std::lock_guard<std::mutex> lock(_mutex); + + // Clear the data. + int dump = ::read(_wakeup[0], &dump, sizeof(dump)); - // Clear the data. - int dump = ::read(_wakeup[0], &dump, sizeof(dump)); + // Copy the new sockets over and clear. + _pollSockets.insert(_pollSockets.end(), + _newSockets.begin(), _newSockets.end()); + _newSockets.clear(); + + // Extract list of callbacks to process + std::swap(_newCallbacks, invoke); + } + + for (size_t i = 0; i < invoke.size(); ++i) + invoke[i](); } } + /// Wakeup the main polling loop in another thread + void wakeup() + { + // wakeup the main-loop. + int rc; + do { + rc = ::write(_wakeup[1], "w", 1); + } while (rc == -1 && errno == EINTR); + + assert (rc != -1 || errno == EAGAIN || errno == EWOULDBLOCK); + } + /// Insert a new socket to be polled. /// Sockets are removed only when the handler return false. void insertNewSocket(const std::shared_ptr<Socket>& newSocket) { std::lock_guard<std::mutex> lock(_mutex); - _newSockets.emplace_back(newSocket); - - // wakeup the main-loop. - int rc; - do - { - // wakeup pipe is already full. - rc = ::write(_wakeup[1], "w", 1); - } - while (rc == -1 && errno == EINTR); - - if (rc == -1) - { - assert(errno == EAGAIN || errno == EWOULDBLOCK); - } + wakeup(); } -private: + typedef std::function<void()> CallbackFn; - /// Add the new sockets to list of those to poll. - void addNewSocketsToPoll() + /// Add a callback to be invoked in the polling thread + void addCallback(CallbackFn fn) { std::lock_guard<std::mutex> lock(_mutex); - - // Copy the new sockets over and clear. - _pollSockets.insert(_pollSockets.end(), _newSockets.begin(), _newSockets.end()); - _newSockets.clear(); + _newCallbacks.emplace_back(fn); + wakeup(); } +private: + void removeSocketFromPoll(const std::shared_ptr<Socket>& socket) { auto it = std::find(_pollSockets.begin(), _pollSockets.end(), socket); @@ -296,6 +307,7 @@ private: /// Protects _newSockets std::mutex _mutex; std::vector<std::shared_ptr<Socket>> _newSockets; + std::vector<CallbackFn> _newCallbacks; /// The fds to poll. std::vector<pollfd> _pollFds; }; _______________________________________________ Libreoffice-commits mailing list libreoffice-comm...@lists.freedesktop.org https://lists.freedesktop.org/mailman/listinfo/libreoffice-commits