loleaflet/js/global.js |   12 +++++++-----
 wsd/DocumentBroker.hpp |    3 ++-
 wsd/LOOLWSD.cpp        |   12 +++++++++---
 wsd/ProxyProtocol.cpp  |   42 +++++++++++++++++++++++++++---------------
 wsd/ProxyProtocol.hpp  |    7 ++-----
 5 files changed, 47 insertions(+), 29 deletions(-)

New commits:
commit 1cb84cb89772ca1d84d6854ae9c4ea6a7bdb2dcc
Author:     Michael Meeks <michael.me...@collabora.com>
AuthorDate: Fri Mar 20 20:15:08 2020 +0000
Commit:     Michael Meeks <michael.me...@collabora.com>
CommitDate: Fri Mar 20 20:15:08 2020 +0000

    Proxy: open four wait sockets concurrently.
    
    Change-Id: I08b85677be528b7aa77272a8527c9bacf3f7c336

diff --git a/loleaflet/js/global.js b/loleaflet/js/global.js
index 24cc2f77f..4e7e4667c 100644
--- a/loleaflet/js/global.js
+++ b/loleaflet/js/global.js
@@ -187,7 +187,7 @@
                this.sessionId = 'fetchsession';
                this.id = window.proxySocketCounter++;
                this.sendCounter = 0;
-               this.readWaiting = false;
+               this.readWaiting = 0;
                this.onclose = function() {
                };
                this.onerror = function() {
@@ -296,9 +296,9 @@
 
                // horrors ...
                this.readInterval = setInterval(function() {
-                       if (this.readWaiting) // one at a time for now
+                       if (that.readWaiting > 4) // max 4 waiting connections 
concurrently.
                                return;
-                       if (this.sessionId == 'fetchsession')
+                       if (that.sessionId == 'fetchsession')
                                return; // waiting for our session id.
                        var req = new XMLHttpRequest();
                        // fetch session id:
@@ -307,13 +307,15 @@
                                        that.parseIncomingArray(new 
Uint8Array(this.response));
                                else
                                        console.debug('Handle error ' + 
this.status);
-                               that.readWaiting = false;
+                       });
+                       req.addEventListener('loadend', function() {
+                               that.readWaiting--;
                        });
                        req.open('GET', that.getEndPoint('read'));
                        req.setRequestHeader('SessionId', that.sessionId);
                        req.responseType = 'arraybuffer';
                        req.send('');
-                       that.readWaiting = true;
+                       that.readWaiting++;
                }, 250);
        };
 
diff --git a/wsd/DocumentBroker.hpp b/wsd/DocumentBroker.hpp
index cb5bf54e3..6bcb606f3 100644
--- a/wsd/DocumentBroker.hpp
+++ b/wsd/DocumentBroker.hpp
@@ -258,7 +258,8 @@ public:
         const Poco::URI& uriPublic,
         const bool isReadOnly,
         const std::string& hostNoTrust,
-        const std::shared_ptr<StreamSocket> &socket);
+        const std::shared_ptr<StreamSocket> &socket,
+        bool isWaiting);
 
     /// Thread safe termination of this broker if it has a lingering thread
     void joinThread();
diff --git a/wsd/LOOLWSD.cpp b/wsd/LOOLWSD.cpp
index 69bf3357e..116b5d432 100644
--- a/wsd/LOOLWSD.cpp
+++ b/wsd/LOOLWSD.cpp
@@ -2831,11 +2831,16 @@ private:
         // Request a kit process for this doc.
         std::shared_ptr<DocumentBroker> docBroker = findOrCreateDocBroker(
             none, url, docKey, _id, uriPublic);
+
+        std::string fullURL = request.getURI();
+        std::string ending = "/ws/read";
+        bool isWaiting = (fullURL.size() > ending.size() &&
+                          std::equal(ending.rbegin(), ending.rend(), 
fullURL.rbegin()));
         if (docBroker)
         {
             // need to move into the DocumentBroker context before doing 
session lookup / creation etc.
             std::string id = _id;
-            disposition.setMove([docBroker, id, uriPublic, isReadOnly, 
hostNoTrust, sessionId]
+            disposition.setMove([docBroker, id, uriPublic, isReadOnly, 
hostNoTrust, sessionId, isWaiting]
                                 (const std::shared_ptr<Socket> &moveSocket)
                 {
                     LOG_TRC("Setting up docbroker thread for " << 
docBroker->getDocKey());
@@ -2845,7 +2850,8 @@ private:
                     // We no longer own this socket.
                     moveSocket->setThreadOwner(std::thread::id());
 
-                    docBroker->addCallback([docBroker, id, uriPublic, 
isReadOnly, hostNoTrust, sessionId, moveSocket]()
+                    docBroker->addCallback([docBroker, id, uriPublic, 
isReadOnly, hostNoTrust,
+                                            sessionId, moveSocket, isWaiting]()
                         {
                             // Now inside the document broker thread ...
                             LOG_TRC("In the docbroker thread for " << 
docBroker->getDocKey());
@@ -2855,7 +2861,7 @@ private:
                             {
                                 docBroker->handleProxyRequest(
                                     sessionId, id, uriPublic, isReadOnly,
-                                    hostNoTrust, streamSocket);
+                                    hostNoTrust, streamSocket, isWaiting);
                                 return;
                             }
                             catch (const UnauthorizedRequestException& exc)
diff --git a/wsd/ProxyProtocol.cpp b/wsd/ProxyProtocol.cpp
index 8aaff0131..25602f146 100644
--- a/wsd/ProxyProtocol.cpp
+++ b/wsd/ProxyProtocol.cpp
@@ -25,7 +25,8 @@ void DocumentBroker::handleProxyRequest(
     const Poco::URI& uriPublic,
     const bool isReadOnly,
     const std::string& hostNoTrust,
-    const std::shared_ptr<StreamSocket> &socket)
+    const std::shared_ptr<StreamSocket> &socket,
+    bool isWaiting)
 {
     std::shared_ptr<ClientSession> clientSession;
     if (sessionId == "fetchsession")
@@ -82,7 +83,7 @@ void DocumentBroker::handleProxyRequest(
     auto proxy = std::static_pointer_cast<ProxyProtocolHandler>(
         protocol);
 
-    proxy->handleRequest(uriPublic.toString(), socket);
+    proxy->handleRequest(isWaiting, socket);
 }
 
 bool ProxyProtocolHandler::parseEmitIncoming(
@@ -128,16 +129,13 @@ bool ProxyProtocolHandler::parseEmitIncoming(
     return true;
 }
 
-void ProxyProtocolHandler::handleRequest(const std::string &uriPublic,
-                                         const std::shared_ptr<Socket> &socket)
+void ProxyProtocolHandler::handleRequest(bool isWaiting, const 
std::shared_ptr<Socket> &socket)
 {
     auto streamSocket = std::static_pointer_cast<StreamSocket>(socket);
 
-    bool bRead = uriPublic.find("/write") == std::string::npos;
-    LOG_INF("Proxy handle request " << uriPublic << " type: " <<
-            (bRead ? "read" : "write"));
+    LOG_INF("Proxy handle request type: " << (isWaiting ? "wait" : "respond"));
 
-    if (bRead)
+    if (!isWaiting)
     {
         if (!_msgHandler)
             LOG_WRN("unusual - incoming message with no-one to handle it");
@@ -149,13 +147,27 @@ void ProxyProtocolHandler::handleRequest(const 
std::string &uriPublic,
         }
     }
 
-    if (!flushQueueTo(streamSocket) && !bRead)
+    if (!flushQueueTo(streamSocket) && isWaiting)
     {
-        // longer running 'write socket'
-        _writeSockets.push_back(streamSocket);
+        LOG_TRC("Queue a waiting socket");
+        // longer running 'write socket' (marked 'read' by the client)
+        _outSockets.push_back(streamSocket);
+        if (_outSockets.size() > 16)
+        {
+            LOG_ERR("Unexpected - client opening many concurrent waiting 
connections " << _outSockets.size());
+            // cleanup older waiting sockets.
+            auto sockWeak = _outSockets.front();
+            _outSockets.erase(_outSockets.begin());
+            auto sock = sockWeak.lock();
+            if (sock)
+                sock->shutdown();
+        }
     }
     else
+    {
+        LOG_TRC("Return a reply immediately");
         socket->shutdown();
+    }
 }
 
 void ProxyProtocolHandler::handleIncomingMessage(SocketDisposition 
&disposition)
@@ -202,7 +214,7 @@ void ProxyProtocolHandler::getIOStats(uint64_t &sent, 
uint64_t &recv)
 
 void ProxyProtocolHandler::dumpState(std::ostream& os)
 {
-    os << "proxy protocol sockets: " << _writeSockets.size() << " writeQueue: 
" << _writeQueue.size() << ":\n";
+    os << "proxy protocol sockets: " << _outSockets.size() << " writeQueue: " 
<< _writeQueue.size() << ":\n";
     for (auto it : _writeQueue)
         Util::dumpHex(os, "\twrite queue entry:", "\t\t", *it);
 }
@@ -256,10 +268,10 @@ bool ProxyProtocolHandler::flushQueueTo(const 
std::shared_ptr<StreamSocket> &soc
 std::shared_ptr<StreamSocket> ProxyProtocolHandler::popWriteSocket()
 {
     std::weak_ptr<StreamSocket> sock;
-    while (!_writeSockets.empty())
+    while (!_outSockets.empty())
     {
-        sock = _writeSockets.front();
-        _writeSockets.erase(_writeSockets.begin());
+        sock = _outSockets.front();
+        _outSockets.erase(_outSockets.begin());
         auto realSock = sock.lock();
         if (realSock)
             return realSock;
diff --git a/wsd/ProxyProtocol.hpp b/wsd/ProxyProtocol.hpp
index 091ac3295..ca7070b27 100644
--- a/wsd/ProxyProtocol.hpp
+++ b/wsd/ProxyProtocol.hpp
@@ -61,11 +61,8 @@ public:
     void shutdown(bool goingAway = false, const std::string &statusMessage = 
"") override;
     void getIOStats(uint64_t &sent, uint64_t &recv) override;
     void dumpState(std::ostream& os);
-
     bool parseEmitIncoming(const std::shared_ptr<StreamSocket> &socket);
-
-    void handleRequest(const std::string &uriPublic,
-                       const std::shared_ptr<Socket> &socket);
+    void handleRequest(bool isWaiting, const std::shared_ptr<Socket> &socket);
 
 private:
     std::shared_ptr<StreamSocket> popWriteSocket();
@@ -89,7 +86,7 @@ private:
     };
     /// queue things when we have no socket to hand.
     std::vector<std::shared_ptr<Message>> _writeQueue;
-    std::vector<std::weak_ptr<StreamSocket>> _writeSockets;
+    std::vector<std::weak_ptr<StreamSocket>> _outSockets;
 };
 
 #endif
_______________________________________________
Libreoffice-commits mailing list
libreoffice-comm...@lists.freedesktop.org
https://lists.freedesktop.org/mailman/listinfo/libreoffice-commits

Reply via email to