loleaflet/js/global.js | 12 +++++++----- wsd/DocumentBroker.hpp | 3 ++- wsd/LOOLWSD.cpp | 12 +++++++++--- wsd/ProxyProtocol.cpp | 42 +++++++++++++++++++++++++++--------------- wsd/ProxyProtocol.hpp | 7 ++----- 5 files changed, 47 insertions(+), 29 deletions(-)
New commits: commit 1cb84cb89772ca1d84d6854ae9c4ea6a7bdb2dcc Author: Michael Meeks <michael.me...@collabora.com> AuthorDate: Fri Mar 20 20:15:08 2020 +0000 Commit: Michael Meeks <michael.me...@collabora.com> CommitDate: Fri Mar 20 20:15:08 2020 +0000 Proxy: open four wait sockets concurrently. Change-Id: I08b85677be528b7aa77272a8527c9bacf3f7c336 diff --git a/loleaflet/js/global.js b/loleaflet/js/global.js index 24cc2f77f..4e7e4667c 100644 --- a/loleaflet/js/global.js +++ b/loleaflet/js/global.js @@ -187,7 +187,7 @@ this.sessionId = 'fetchsession'; this.id = window.proxySocketCounter++; this.sendCounter = 0; - this.readWaiting = false; + this.readWaiting = 0; this.onclose = function() { }; this.onerror = function() { @@ -296,9 +296,9 @@ // horrors ... this.readInterval = setInterval(function() { - if (this.readWaiting) // one at a time for now + if (that.readWaiting > 4) // max 4 waiting connections concurrently. return; - if (this.sessionId == 'fetchsession') + if (that.sessionId == 'fetchsession') return; // waiting for our session id. var req = new XMLHttpRequest(); // fetch session id: @@ -307,13 +307,15 @@ that.parseIncomingArray(new Uint8Array(this.response)); else console.debug('Handle error ' + this.status); - that.readWaiting = false; + }); + req.addEventListener('loadend', function() { + that.readWaiting--; }); req.open('GET', that.getEndPoint('read')); req.setRequestHeader('SessionId', that.sessionId); req.responseType = 'arraybuffer'; req.send(''); - that.readWaiting = true; + that.readWaiting++; }, 250); }; diff --git a/wsd/DocumentBroker.hpp b/wsd/DocumentBroker.hpp index cb5bf54e3..6bcb606f3 100644 --- a/wsd/DocumentBroker.hpp +++ b/wsd/DocumentBroker.hpp @@ -258,7 +258,8 @@ public: const Poco::URI& uriPublic, const bool isReadOnly, const std::string& hostNoTrust, - const std::shared_ptr<StreamSocket> &socket); + const std::shared_ptr<StreamSocket> &socket, + bool isWaiting); /// Thread safe termination of this broker if it has a lingering thread void joinThread(); diff --git a/wsd/LOOLWSD.cpp b/wsd/LOOLWSD.cpp index 69bf3357e..116b5d432 100644 --- a/wsd/LOOLWSD.cpp +++ b/wsd/LOOLWSD.cpp @@ -2831,11 +2831,16 @@ private: // Request a kit process for this doc. std::shared_ptr<DocumentBroker> docBroker = findOrCreateDocBroker( none, url, docKey, _id, uriPublic); + + std::string fullURL = request.getURI(); + std::string ending = "/ws/read"; + bool isWaiting = (fullURL.size() > ending.size() && + std::equal(ending.rbegin(), ending.rend(), fullURL.rbegin())); if (docBroker) { // need to move into the DocumentBroker context before doing session lookup / creation etc. std::string id = _id; - disposition.setMove([docBroker, id, uriPublic, isReadOnly, hostNoTrust, sessionId] + disposition.setMove([docBroker, id, uriPublic, isReadOnly, hostNoTrust, sessionId, isWaiting] (const std::shared_ptr<Socket> &moveSocket) { LOG_TRC("Setting up docbroker thread for " << docBroker->getDocKey()); @@ -2845,7 +2850,8 @@ private: // We no longer own this socket. moveSocket->setThreadOwner(std::thread::id()); - docBroker->addCallback([docBroker, id, uriPublic, isReadOnly, hostNoTrust, sessionId, moveSocket]() + docBroker->addCallback([docBroker, id, uriPublic, isReadOnly, hostNoTrust, + sessionId, moveSocket, isWaiting]() { // Now inside the document broker thread ... LOG_TRC("In the docbroker thread for " << docBroker->getDocKey()); @@ -2855,7 +2861,7 @@ private: { docBroker->handleProxyRequest( sessionId, id, uriPublic, isReadOnly, - hostNoTrust, streamSocket); + hostNoTrust, streamSocket, isWaiting); return; } catch (const UnauthorizedRequestException& exc) diff --git a/wsd/ProxyProtocol.cpp b/wsd/ProxyProtocol.cpp index 8aaff0131..25602f146 100644 --- a/wsd/ProxyProtocol.cpp +++ b/wsd/ProxyProtocol.cpp @@ -25,7 +25,8 @@ void DocumentBroker::handleProxyRequest( const Poco::URI& uriPublic, const bool isReadOnly, const std::string& hostNoTrust, - const std::shared_ptr<StreamSocket> &socket) + const std::shared_ptr<StreamSocket> &socket, + bool isWaiting) { std::shared_ptr<ClientSession> clientSession; if (sessionId == "fetchsession") @@ -82,7 +83,7 @@ void DocumentBroker::handleProxyRequest( auto proxy = std::static_pointer_cast<ProxyProtocolHandler>( protocol); - proxy->handleRequest(uriPublic.toString(), socket); + proxy->handleRequest(isWaiting, socket); } bool ProxyProtocolHandler::parseEmitIncoming( @@ -128,16 +129,13 @@ bool ProxyProtocolHandler::parseEmitIncoming( return true; } -void ProxyProtocolHandler::handleRequest(const std::string &uriPublic, - const std::shared_ptr<Socket> &socket) +void ProxyProtocolHandler::handleRequest(bool isWaiting, const std::shared_ptr<Socket> &socket) { auto streamSocket = std::static_pointer_cast<StreamSocket>(socket); - bool bRead = uriPublic.find("/write") == std::string::npos; - LOG_INF("Proxy handle request " << uriPublic << " type: " << - (bRead ? "read" : "write")); + LOG_INF("Proxy handle request type: " << (isWaiting ? "wait" : "respond")); - if (bRead) + if (!isWaiting) { if (!_msgHandler) LOG_WRN("unusual - incoming message with no-one to handle it"); @@ -149,13 +147,27 @@ void ProxyProtocolHandler::handleRequest(const std::string &uriPublic, } } - if (!flushQueueTo(streamSocket) && !bRead) + if (!flushQueueTo(streamSocket) && isWaiting) { - // longer running 'write socket' - _writeSockets.push_back(streamSocket); + LOG_TRC("Queue a waiting socket"); + // longer running 'write socket' (marked 'read' by the client) + _outSockets.push_back(streamSocket); + if (_outSockets.size() > 16) + { + LOG_ERR("Unexpected - client opening many concurrent waiting connections " << _outSockets.size()); + // cleanup older waiting sockets. + auto sockWeak = _outSockets.front(); + _outSockets.erase(_outSockets.begin()); + auto sock = sockWeak.lock(); + if (sock) + sock->shutdown(); + } } else + { + LOG_TRC("Return a reply immediately"); socket->shutdown(); + } } void ProxyProtocolHandler::handleIncomingMessage(SocketDisposition &disposition) @@ -202,7 +214,7 @@ void ProxyProtocolHandler::getIOStats(uint64_t &sent, uint64_t &recv) void ProxyProtocolHandler::dumpState(std::ostream& os) { - os << "proxy protocol sockets: " << _writeSockets.size() << " writeQueue: " << _writeQueue.size() << ":\n"; + os << "proxy protocol sockets: " << _outSockets.size() << " writeQueue: " << _writeQueue.size() << ":\n"; for (auto it : _writeQueue) Util::dumpHex(os, "\twrite queue entry:", "\t\t", *it); } @@ -256,10 +268,10 @@ bool ProxyProtocolHandler::flushQueueTo(const std::shared_ptr<StreamSocket> &soc std::shared_ptr<StreamSocket> ProxyProtocolHandler::popWriteSocket() { std::weak_ptr<StreamSocket> sock; - while (!_writeSockets.empty()) + while (!_outSockets.empty()) { - sock = _writeSockets.front(); - _writeSockets.erase(_writeSockets.begin()); + sock = _outSockets.front(); + _outSockets.erase(_outSockets.begin()); auto realSock = sock.lock(); if (realSock) return realSock; diff --git a/wsd/ProxyProtocol.hpp b/wsd/ProxyProtocol.hpp index 091ac3295..ca7070b27 100644 --- a/wsd/ProxyProtocol.hpp +++ b/wsd/ProxyProtocol.hpp @@ -61,11 +61,8 @@ public: void shutdown(bool goingAway = false, const std::string &statusMessage = "") override; void getIOStats(uint64_t &sent, uint64_t &recv) override; void dumpState(std::ostream& os); - bool parseEmitIncoming(const std::shared_ptr<StreamSocket> &socket); - - void handleRequest(const std::string &uriPublic, - const std::shared_ptr<Socket> &socket); + void handleRequest(bool isWaiting, const std::shared_ptr<Socket> &socket); private: std::shared_ptr<StreamSocket> popWriteSocket(); @@ -89,7 +86,7 @@ private: }; /// queue things when we have no socket to hand. std::vector<std::shared_ptr<Message>> _writeQueue; - std::vector<std::weak_ptr<StreamSocket>> _writeSockets; + std::vector<std::weak_ptr<StreamSocket>> _outSockets; }; #endif _______________________________________________ Libreoffice-commits mailing list libreoffice-comm...@lists.freedesktop.org https://lists.freedesktop.org/mailman/listinfo/libreoffice-commits