common/Util.cpp | 4 + net/Socket.cpp | 13 +++ net/Socket.hpp | 20 +++-- net/SslSocket.hpp | 3 net/WebSocketHandler.hpp | 15 ++-- net/loolnb.cpp | 9 +- wsd/Admin.cpp | 8 +- wsd/ClientSession.cpp | 34 ++++++++- wsd/ClientSession.hpp | 18 ----- wsd/DocumentBroker.cpp | 44 ++++++++---- wsd/DocumentBroker.hpp | 15 ++-- wsd/LOOLWSD.cpp | 167 +++++++++++++++++++++++------------------------ 12 files changed, 203 insertions(+), 147 deletions(-)
New commits: commit d0bb5cbdc7a2a76877b17bbc4283a76ff8cc4d52 Author: Ashod Nakashian <ashod.nakash...@collabora.co.uk> Date: Mon Mar 20 00:41:43 2017 -0400 wsd: restore forkit after crash Change-Id: Iacfcbfbf922897ea1bb9896d01a9a8afd4e194cc diff --git a/wsd/LOOLWSD.cpp b/wsd/LOOLWSD.cpp index e096ade4..226fbf58 100644 --- a/wsd/LOOLWSD.cpp +++ b/wsd/LOOLWSD.cpp @@ -288,7 +288,7 @@ bool cleanupDocBrokers() /// Forks as many children as requested. /// Returns the number of children requested to spawn, /// -1 for error. -static bool forkChildren(const int number) +static int forkChildren(const int number) { Util::assertIsLocked(NewChildrenMutex); @@ -417,7 +417,20 @@ std::shared_ptr<ChildProcess> getNewChild_Blocks() if (rebalanceChildren(numPreSpawn) < 0) { // Fatal. Let's fail and retry at a higher level. - LOG_DBG("getNewChild: rebalancing of children failed."); + LOG_DBG("getNewChild: rebalancing of children failed. Checking and restoring forkit."); + + lockb.unlock(); + locka.unlock(); + LOOLWSD::checkAndRestoreForKit(); + if (chrono::duration_cast<chrono::milliseconds>(chrono::steady_clock::now() - startTime).count() < + CHILD_TIMEOUT_MS * 4) + { + // Try again. + locka.lock(); + lockb.lock(); + continue; + } + return nullptr; } @@ -1025,7 +1038,7 @@ bool LOOLWSD::checkAndRestoreForKit() { // Should never fail. LOG_FTL("Failed to spawn loolforkit."); - return Application::EXIT_SOFTWARE; + SigUtil::requestShutdown(); } } commit 53da72a1dc10f4f5f121bc167680493a123c0b88 Author: Ashod Nakashian <ashod.nakash...@collabora.co.uk> Date: Sun Mar 19 21:45:53 2017 -0400 wsd: fix hot looping the poll When not sending ping the ping time is not set which results in the setting the poll timeout to a negative value, forcing it to return immediately. This happens when sending ping before upgrading to WebSocket, which isn't common. One way to reproduce it, however, is to connect to the admin console with an unauthenticated socket. Change-Id: I9f3db1a02b8f8e2781d23d843e848068ad434958 diff --git a/net/WebSocketHandler.hpp b/net/WebSocketHandler.hpp index 72466d8f..9fe7aab6 100644 --- a/net/WebSocketHandler.hpp +++ b/net/WebSocketHandler.hpp @@ -262,6 +262,7 @@ public: if (_wsState == WSState::WS) { LOG_WRN("Attempted ping on non-upgraded websocket!"); + _pingSent = now; // Pretend we sent it to avoid timing out immediately. return; } LOG_TRC("Send ping message"); diff --git a/wsd/Admin.cpp b/wsd/Admin.cpp index 19819e26..bb235156 100644 --- a/wsd/Admin.cpp +++ b/wsd/Admin.cpp @@ -64,8 +64,8 @@ void AdminSocketHandler::handleMessage(bool /* fin */, WSOpCode /* code */, { if (tokens.count() < 2) { + LOG_DBG("Auth command without any token"); sendFrame("InvalidAuthToken"); - LOG_TRC("Auth command without any token"); shutdown(); return; } @@ -84,8 +84,8 @@ void AdminSocketHandler::handleMessage(bool /* fin */, WSOpCode /* code */, } else { + LOG_DBG("Invalid auth token"); sendFrame("InvalidAuthToken"); - LOG_TRC("Invalid auth token"); shutdown(); return; } @@ -93,10 +93,10 @@ void AdminSocketHandler::handleMessage(bool /* fin */, WSOpCode /* code */, if (!_isAuthenticated) { + LOG_DBG("Not authenticated - message is '" << firstLine << "' " << + tokens.count() << " first: '" << tokens[0] << "'"); sendFrame("NotAuthenticated"); shutdown(); - LOG_TRC("Not authenticated - message is '" << firstLine << "' " - << tokens.count() << " first: '" << tokens[0] << "'"); return; } else if (tokens[0] == "documents" || commit e9675ed6e1022e449c13802b9e2929f059a7fdf1 Author: Ashod Nakashian <ashod.nakash...@collabora.co.uk> Date: Sun Mar 19 19:07:42 2017 -0400 wsd: close socket when WS close handshake is complete We shouldn't send any more data after the client shuts down, or after we initiate shutdown. Change-Id: Ibf0cf61dcabe9d02ddcb7eb40b2df23712c5a136 diff --git a/net/SslSocket.hpp b/net/SslSocket.hpp index 6f134e89..3654c4b7 100644 --- a/net/SslSocket.hpp +++ b/net/SslSocket.hpp @@ -70,6 +70,9 @@ public: // Complete the bidirectional shutdown. SSL_shutdown(_ssl); } + + // Close the TCP Socket. + Socket::shutdown(); } bool readIncomingData() override diff --git a/net/WebSocketHandler.hpp b/net/WebSocketHandler.hpp index 8a535cb9..72466d8f 100644 --- a/net/WebSocketHandler.hpp +++ b/net/WebSocketHandler.hpp @@ -225,7 +225,7 @@ public: } // TCP Close. - socket->shutdown(); + socket->closeConnection(); break; default: handleMessage(fin, code, _wsPayload); commit 5a3b81216716b3be63e109295247026fae6f0600 Author: Ashod Nakashian <ashod.nakash...@collabora.co.uk> Date: Sun Mar 19 19:01:03 2017 -0400 wsd: stop DocBroker poll and terminate child when no more sessions Normally the last session either stopped and terminated the DocBroker upon saving the doc, or immediately upon removal when nothing to save. However, saving could fail, or the session could be disconnected by the client, or saving could timeout, etc. In all those failure scenarios the DocBroker should not linger as a zombie (alive but without sessions). Here we detect that we are left with no sessions and terminate correctly. Change-Id: I31862234e321f63e686f54fa69daacc1fa06ae75 diff --git a/wsd/DocumentBroker.cpp b/wsd/DocumentBroker.cpp index 2fd40d6e..de09bccb 100644 --- a/wsd/DocumentBroker.cpp +++ b/wsd/DocumentBroker.cpp @@ -252,8 +252,18 @@ void DocumentBroker::pollThread() autoSave(false); last30SecCheckTime = std::chrono::steady_clock::now(); } + + // If all sessions have been removed, no reason to linger. + if (_sessions.empty()) + { + LOG_INF("No more sessions in doc [" << _docKey << "]. Terminating."); + _stop = true; + } } + // Terminate properly while we can. + auto lock = getLock(); + terminateChild(lock, "", false); LOG_INF("Finished docBroker polling thread for docKey [" << _docKey << "]."); } commit 47bbbbb2dcf613e1f7816ab0b963427341c643bd Author: Ashod Nakashian <ashod.nakash...@collabora.co.uk> Date: Sun Mar 19 18:57:31 2017 -0400 wsd: correct prespawning of children and simplify The prisoner poll should wake every so often to check and rebalance the new children. However this didn't happen correctly and WSD would starve of children every so often. The frequency of checking and rebalancing of children should be reviewed and optimized. Also simplified the code to avoid rebalancing DocBrokers and only do NewChildren. Change-Id: Id3be34ed3a47c739b606ee7969088397d3807e7a diff --git a/wsd/LOOLWSD.cpp b/wsd/LOOLWSD.cpp index f61c3381..e096ade4 100644 --- a/wsd/LOOLWSD.cpp +++ b/wsd/LOOLWSD.cpp @@ -290,7 +290,6 @@ bool cleanupDocBrokers() /// -1 for error. static bool forkChildren(const int number) { - Util::assertIsLocked(DocBrokersMutex); Util::assertIsLocked(NewChildrenMutex); if (number > 0) @@ -328,19 +327,17 @@ static bool cleanupChildren() { Util::assertIsLocked(NewChildrenMutex); - bool removed = false; - for (int i = NewChildren.size() - 1; i >= 0; --i) + const int count = NewChildren.size(); + for (int i = count - 1; i >= 0; --i) { if (!NewChildren[i]->isAlive()) { LOG_WRN("Removing dead spare child [" << NewChildren[i]->getPid() << "]."); - NewChildren.erase(NewChildren.begin() + i); - removed = true; } } - return removed; + return static_cast<int>(NewChildren.size()) != count; } /// Decides how many children need spawning and spanws. @@ -349,7 +346,6 @@ static bool cleanupChildren() /// -1 for error. static int rebalanceChildren(int balance) { - Util::assertIsLocked(DocBrokersMutex); Util::assertIsLocked(NewChildrenMutex); // Do the cleanup first. @@ -386,26 +382,9 @@ static int rebalanceChildren(int balance) /// Returns true only if at least one child was requested to spawn. static bool prespawnChildren() { -#if 1 // FIXME: why re-balance DockBrokers here ? ... - // First remove dead DocBrokers, if possible. - std::unique_lock<std::mutex> docBrokersLock(DocBrokersMutex, std::defer_lock); - if (!docBrokersLock.try_lock()) - { - // Busy, try again later. - return false; - } - - cleanupDocBrokers(); -#endif - + // Rebalance if not forking already. std::unique_lock<std::mutex> lock(NewChildrenMutex, std::defer_lock); - if (!lock.try_lock()) - { - // We are forking already? Try later. - return false; - } - - return rebalanceChildren(LOOLWSD::NumPreSpawnedChildren) > 0; + return lock.try_lock() && (rebalanceChildren(LOOLWSD::NumPreSpawnedChildren) > 0); } static size_t addNewChild(const std::shared_ptr<ChildProcess>& child) @@ -1220,10 +1199,6 @@ bool LOOLWSD::createForKit() // Init the Admin manager Admin::instance().setForKitPid(ForKitProcId); - // Wake the prisoner poll to spawn some children, if necessary. - PrisonerPoll.wakeup(); - // FIXME: horrors with try_lock in prespawnChildren ... - return (ForKitProcId != -1); #endif } @@ -2482,14 +2457,20 @@ int LOOLWSD::main(const std::vector<std::string>& /*args*/) mainWait.poll(SocketPoll::DefaultPollTimeoutMs * 2); + // Wake the prisoner poll to spawn some children, if necessary. + PrisonerPoll.wakeup(); + // Unit test timeout if (std::chrono::duration_cast<std::chrono::milliseconds>( std::chrono::steady_clock::now() - startStamp).count() < UnitWSD::get().getTimeoutMilliSeconds()) UnitWSD::get().timeout(); - std::unique_lock<std::mutex> docBrokersLock(DocBrokersMutex); - cleanupDocBrokers(); + std::unique_lock<std::mutex> docBrokersLock(DocBrokersMutex, std::defer_lock); + if (docBrokersLock.try_lock()) + { + cleanupDocBrokers(); + } #if ENABLE_DEBUG if (careerSpanSeconds > 0 && time(nullptr) > startTimeSpan + careerSpanSeconds) commit 6283dbd9ccbd450d362a9d6d9c575726640df514 Author: Ashod Nakashian <ashod.nakash...@collabora.co.uk> Date: Sun Mar 19 18:54:07 2017 -0400 wsd: copy and un-mask web-socket data at the same time Change-Id: I2a4831065ae0a81f20d0513b0772d7d427ffc4ea diff --git a/net/WebSocketHandler.hpp b/net/WebSocketHandler.hpp index 9cb6a1d6..8a535cb9 100644 --- a/net/WebSocketHandler.hpp +++ b/net/WebSocketHandler.hpp @@ -185,11 +185,11 @@ public: if (hasMask) { + const size_t end = _wsPayload.size(); + _wsPayload.resize(end + payloadLen); + char* wsData = &_wsPayload[end]; for (size_t i = 0; i < payloadLen; ++i) - data[i] = data[i] ^ mask[i % 4]; - - // FIXME: copy and un-mask at the same time ... - _wsPayload.insert(_wsPayload.end(), data, data + payloadLen); + *wsData++ = data[i] ^ mask[i % 4]; } else _wsPayload.insert(_wsPayload.end(), data, data + payloadLen); commit 714eeac3aae724c25160c6b38ae8a4cf6bafbd35 Author: Ashod Nakashian <ashod.nakash...@collabora.co.uk> Date: Sun Mar 19 14:49:52 2017 -0400 wsd: simplify DocBroker creation Change-Id: Icc93af2e32ce544c42cc65bbea83b9539c044db9 diff --git a/wsd/DocumentBroker.cpp b/wsd/DocumentBroker.cpp index 75b766b6..2fd40d6e 100644 --- a/wsd/DocumentBroker.cpp +++ b/wsd/DocumentBroker.cpp @@ -159,7 +159,8 @@ DocumentBroker::DocumentBroker(const std::string& uri, assert(!_docKey.empty()); assert(!_childRoot.empty()); - LOG_INF("DocumentBroker [" << _uriPublic.toString() << "] created. DocKey: [" << _docKey << "]"); + LOG_INF("DocumentBroker [" << _uriPublic.toString() << + "] created with docKey [" << _docKey << "] and root [" << _childRoot << "]"); } void DocumentBroker::startThread() @@ -202,6 +203,7 @@ void DocumentBroker::pollThread() } _childProcess->setDocumentBroker(shared_from_this()); + LOG_INF("Doc [" << _docKey << "] attached to child [" << _childProcess->getPid() << "]."); auto last30SecCheckTime = std::chrono::steady_clock::now(); diff --git a/wsd/LOOLWSD.cpp b/wsd/LOOLWSD.cpp index 8508776b..f61c3381 100644 --- a/wsd/LOOLWSD.cpp +++ b/wsd/LOOLWSD.cpp @@ -1232,30 +1232,6 @@ bool LOOLWSD::createForKit() std::mutex Connection::Mutex; #endif -static std::shared_ptr<DocumentBroker> createDocBroker(WebSocketHandler& ws, - const std::string& uri, - const std::string& docKey, - const Poco::URI& uriPublic) -{ - Util::assertIsLocked(DocBrokersMutex); - - static_assert(MAX_DOCUMENTS > 0, "MAX_DOCUMENTS must be positive"); - if (DocBrokers.size() + 1 > MAX_DOCUMENTS) - { - LOG_ERR("Maximum number of open documents reached."); - shutdownLimitReached(ws); - return nullptr; - } - - // Set the one we just created. - LOG_DBG("New DocumentBroker for docKey [" << docKey << "]."); - auto docBroker = std::make_shared<DocumentBroker>(uri, uriPublic, docKey, LOOLWSD::ChildRoot); - DocBrokers.emplace(docKey, docBroker); - LOG_TRC("Have " << DocBrokers.size() << " DocBrokers after inserting [" << docKey << "]."); - - return docBroker; -} - /// Find the DocumentBroker for the given docKey, if one exists. /// Otherwise, creates and adds a new one to DocBrokers. /// May return null if terminating or MaxDocuments limit is reached. @@ -1312,7 +1288,23 @@ static std::shared_ptr<DocumentBroker> findOrCreateDocBroker(WebSocketHandler& w ws.sendFrame(statusConnect); if (!docBroker) - docBroker = createDocBroker(ws, uri, docKey, uriPublic); + { + Util::assertIsLocked(DocBrokersMutex); + + static_assert(MAX_DOCUMENTS > 0, "MAX_DOCUMENTS must be positive"); + if (DocBrokers.size() + 1 > MAX_DOCUMENTS) + { + LOG_ERR("Maximum number of open documents reached."); + shutdownLimitReached(ws); + return nullptr; + } + + // Set the one we just created. + LOG_DBG("New DocumentBroker for docKey [" << docKey << "]."); + docBroker = std::make_shared<DocumentBroker>(uri, uriPublic, docKey, LOOLWSD::ChildRoot); + DocBrokers.emplace(docKey, docBroker); + LOG_TRC("Have " << DocBrokers.size() << " DocBrokers after inserting [" << docKey << "]."); + } return docBroker; } commit 0d3ea2bbfdc3a619228beb171688885376082065 Author: Ashod Nakashian <ashod.nakash...@collabora.co.uk> Date: Sun Mar 19 13:22:12 2017 -0400 wsd: flag thread start before creating thread This prevents a race where the thread is started a second time before the first gets a chance to set the flag. Change-Id: Ib106aa0626cdfa403b321822180b0545d3aa9139 diff --git a/net/Socket.cpp b/net/Socket.cpp index 7d165bb9..0e5d6eab 100644 --- a/net/Socket.cpp +++ b/net/Socket.cpp @@ -89,8 +89,17 @@ void SocketPoll::startThread() { if (!_threadStarted) { - _thread = std::thread(&SocketPoll::pollingThreadEntry, this); - _owner = _thread.get_id(); + _threadStarted = true; + try + { + _thread = std::thread(&SocketPoll::pollingThreadEntry, this); + _owner = _thread.get_id(); + } + catch (const std::exception& exc) + { + LOG_ERR("Failed to start poll thread: " << exc.what()); + _threadStarted = false; + } } } diff --git a/net/Socket.hpp b/net/Socket.hpp index fa5c0971..c40802f1 100644 --- a/net/Socket.hpp +++ b/net/Socket.hpp @@ -464,8 +464,6 @@ private: /// Used to set the thread name and mark the thread as stopped when done. void pollingThreadEntry() { - _threadStarted = true; - try { Util::setThreadName(_name); commit 2ba2e213bbe4f0561bd0916b4fc9a61777824e32 Author: Ashod Nakashian <ashod.nakash...@collabora.co.uk> Date: Sun Mar 19 10:25:06 2017 -0400 wsd: support rude termination of documents Termination should normally be initiated by the DocumentBroker in question, so sending of termination message on the sockets come from the correct thread. When termination happens from elsewhere (f.e. cleanupDocBrokers) we cannot send socket messages, and have to resort to rude termination. Change-Id: I94acb7b314f5dbdc45c57049fc1ac8527ba72fb9 diff --git a/wsd/ClientSession.cpp b/wsd/ClientSession.cpp index af0265c3..181fae67 100644 --- a/wsd/ClientSession.cpp +++ b/wsd/ClientSession.cpp @@ -605,7 +605,7 @@ bool ClientSession::handleKitToClientMessage(const char* buffer, const int lengt // Now terminate. auto lock = docBroker->getLock(); - docBroker->terminateChild(lock, ""); + docBroker->terminateChild(lock, "", true); } return true; diff --git a/wsd/DocumentBroker.cpp b/wsd/DocumentBroker.cpp index e1362ec6..75b766b6 100644 --- a/wsd/DocumentBroker.cpp +++ b/wsd/DocumentBroker.cpp @@ -1252,7 +1252,7 @@ void DocumentBroker::childSocketTerminated() } } -void DocumentBroker::terminateChild(std::unique_lock<std::mutex>& lock, const std::string& closeReason) +void DocumentBroker::terminateChild(std::unique_lock<std::mutex>& lock, const std::string& closeReason, const bool rude) { Util::assertIsLocked(_mutex); Util::assertIsLocked(lock); @@ -1260,15 +1260,18 @@ void DocumentBroker::terminateChild(std::unique_lock<std::mutex>& lock, const st LOG_INF("Terminating doc [" << _docKey << "]."); // Close all running sessions - for (const auto& pair : _sessions) + if (!rude) { - try - { - pair.second->shutdown(WebSocketHandler::StatusCodes::ENDPOINT_GOING_AWAY, closeReason); - } - catch (const std::exception& ex) + for (const auto& pair : _sessions) { - LOG_ERR("Error while terminating client connection [" << pair.first << "]: " << ex.what()); + try + { + pair.second->shutdown(WebSocketHandler::StatusCodes::ENDPOINT_GOING_AWAY, closeReason); + } + catch (const std::exception& ex) + { + LOG_ERR("Error while terminating client connection [" << pair.first << "]: " << ex.what()); + } } } @@ -1278,12 +1281,15 @@ void DocumentBroker::terminateChild(std::unique_lock<std::mutex>& lock, const st // First flag to stop as it might be waiting on our lock // to process some incoming message. - _childProcess->stop(); + if (!rude) + { + _childProcess->stop(); + } // Release the lock and wait for the thread to finish. lock.unlock(); - _childProcess->close(false); + _childProcess->close(rude); } // Stop the polling thread. @@ -1295,7 +1301,7 @@ void DocumentBroker::closeDocument(const std::string& reason) auto lock = getLock(); LOG_DBG("Closing DocumentBroker for docKey [" << _docKey << "] with reason: " << reason); - terminateChild(lock, reason); + terminateChild(lock, reason, true); } void DocumentBroker::updateLastActivityTime() diff --git a/wsd/DocumentBroker.hpp b/wsd/DocumentBroker.hpp index 6762ad8d..744cf102 100644 --- a/wsd/DocumentBroker.hpp +++ b/wsd/DocumentBroker.hpp @@ -115,12 +115,15 @@ public: { LOG_DBG("Closing ChildProcess [" << _pid << "]."); - // First mark to stop the thread so it knows it's intentional. - stop(); + if (!rude) + { + // First mark to stop the thread so it knows it's intentional. + stop(); - // Shutdown the socket to break the thread if blocked on it. - if (_ws) - _ws->shutdown(); + // Shutdown the socket. + if (_ws) + _ws->shutdown(); + } } catch (const std::exception& ex) { @@ -317,7 +320,7 @@ public: /// We must be called under lock and it must be /// passed to us so we unlock before waiting on /// the ChildProcess thread, which can take our lock. - void terminateChild(std::unique_lock<std::mutex>& lock, const std::string& closeReason); + void terminateChild(std::unique_lock<std::mutex>& lock, const std::string& closeReason, const bool rude); /// Get the PID of the associated child process Poco::Process::PID getPid() const { return _childProcess->getPid(); } diff --git a/wsd/LOOLWSD.cpp b/wsd/LOOLWSD.cpp index 7b24449b..8508776b 100644 --- a/wsd/LOOLWSD.cpp +++ b/wsd/LOOLWSD.cpp @@ -249,7 +249,7 @@ bool cleanupDocBrokers() { LOG_INF("Terminating " << (idle ? "idle" : "dead") << " DocumentBroker for docKey [" << it->first << "]."); - docBroker->terminateChild(lock, idle ? "idle" : ""); + docBroker->terminateChild(lock, idle ? "idle" : "", true); // Remove only when not alive. if (!docBroker->isAlive()) @@ -1335,7 +1335,7 @@ static void removeDocBrokerSession(const std::shared_ptr<DocumentBroker>& docBro { LOG_INF("Removing unloaded DocumentBroker for docKey [" << docKey << "]."); DocBrokers.erase(docKey); - docBroker->terminateChild(lock, ""); + docBroker->terminateChild(lock, "", true); } } commit 14779f5cd4a966590442bda4fc8878a086d84df3 Author: Ashod Nakashian <ashod.nakash...@collabora.co.uk> Date: Sat Mar 18 23:22:56 2017 -0400 wsd: return moved socket state to stop any IO processing Once a socket has changed ownership to a new poll it will assert thread affinity with said new poll. So we cannot do any IO on the old poll's thread at that point and on. Change-Id: I662f188dea7c377a18f3e546839ec43f2875dc7b diff --git a/net/Socket.hpp b/net/Socket.hpp index bcdb518d..fa5c0971 100644 --- a/net/Socket.hpp +++ b/net/Socket.hpp @@ -79,7 +79,7 @@ public: int &timeoutMaxMs) = 0; /// Handle results of events returned from poll - enum class HandleResult { CONTINUE, SOCKET_CLOSED }; + enum class HandleResult { CONTINUE, SOCKET_CLOSED, MOVED }; virtual HandleResult handlePoll(std::chrono::steady_clock::time_point now, int events) = 0; /// manage latency issues around packet aggregation @@ -518,8 +518,14 @@ public: /// Will be called exactly once. virtual void onConnect(const std::weak_ptr<StreamSocket>& socket) = 0; + enum class SocketOwnership + { + UNCHANGED, //< Same socket poll, business as usual. + MOVED //< The socket poll is now different. + }; + /// Called after successful socket reads. - virtual void handleIncomingMessage() = 0; + virtual SocketHandlerInterface::SocketOwnership handleIncomingMessage() = 0; /// Prepare our poll record; adjust @timeoutMaxMs downwards /// for timeouts, based on current time @now. @@ -708,7 +714,8 @@ protected: while (!_inBuffer.empty() && oldSize != _inBuffer.size()) { oldSize = _inBuffer.size(); - _socketHandler->handleIncomingMessage(); + if (_socketHandler->handleIncomingMessage() == SocketHandlerInterface::SocketOwnership::MOVED) + return Socket::HandleResult::MOVED; } do diff --git a/net/WebSocketHandler.hpp b/net/WebSocketHandler.hpp index 8bdd04f6..9cb6a1d6 100644 --- a/net/WebSocketHandler.hpp +++ b/net/WebSocketHandler.hpp @@ -238,10 +238,12 @@ public: } /// Implementation of the SocketHandlerInterface. - virtual void handleIncomingMessage() override + virtual SocketHandlerInterface::SocketOwnership handleIncomingMessage() override { while (handleOneIncomingMessage()) ; // can have multiple msgs in one recv'd packet. + + return SocketHandlerInterface::SocketOwnership::UNCHANGED; } int getPollEvents(std::chrono::steady_clock::time_point now, diff --git a/net/loolnb.cpp b/net/loolnb.cpp index 566ab1eb..a014173a 100644 --- a/net/loolnb.cpp +++ b/net/loolnb.cpp @@ -45,7 +45,7 @@ public: { } - virtual void handleIncomingMessage() override + virtual SocketHandlerInterface::SocketOwnership handleIncomingMessage() override { LOG_TRC("incoming WebSocket message"); if (_wsState == WSState::HTTP) @@ -89,17 +89,16 @@ public: std::string str = oss.str(); socket->_outBuffer.insert(socket->_outBuffer.end(), str.begin(), str.end()); - return; + return SocketHandlerInterface::SocketOwnership::UNCHANGED; } else if (tokens.count() == 2 && tokens[1] == "ws") { - upgradeToWebSocket(req); - return; + return SocketHandlerInterface::SocketOwnership::UNCHANGED; } } - WebSocketHandler::handleIncomingMessage(); + return WebSocketHandler::handleIncomingMessage(); } virtual void handleMessage(const bool fin, const WSOpCode code, std::vector<char> &data) override diff --git a/wsd/ClientSession.cpp b/wsd/ClientSession.cpp index 08adefd0..af0265c3 100644 --- a/wsd/ClientSession.cpp +++ b/wsd/ClientSession.cpp @@ -51,12 +51,13 @@ ClientSession::~ClientSession() stop(); } -void ClientSession::handleIncomingMessage() +SocketHandlerInterface::SocketOwnership ClientSession::handleIncomingMessage() { if (UnitWSD::get().filterHandleRequest( UnitWSD::TestRequest::Client, *this)) - return; - Session::handleIncomingMessage(); + return SocketHandlerInterface::SocketOwnership::UNCHANGED; + + return Session::handleIncomingMessage(); } bool ClientSession::_handleInput(const char *buffer, int length) diff --git a/wsd/ClientSession.hpp b/wsd/ClientSession.hpp index 664f30d5..19aee2af 100644 --- a/wsd/ClientSession.hpp +++ b/wsd/ClientSession.hpp @@ -30,7 +30,7 @@ public: virtual ~ClientSession(); - void handleIncomingMessage() override; + SocketHandlerInterface::SocketOwnership handleIncomingMessage() override; void setReadOnly(); bool isReadOnly() const { return _isReadOnly; } diff --git a/wsd/LOOLWSD.cpp b/wsd/LOOLWSD.cpp index 2e786a0d..7b24449b 100644 --- a/wsd/LOOLWSD.cpp +++ b/wsd/LOOLWSD.cpp @@ -1424,17 +1424,16 @@ private: } /// Called after successful socket reads. - void handleIncomingMessage() override + SocketHandlerInterface::SocketOwnership handleIncomingMessage() override { if (UnitWSD::get().filterHandleRequest( UnitWSD::TestRequest::Prisoner, *this)) - return; + return SocketHandlerInterface::SocketOwnership::UNCHANGED; if (_childProcess.lock()) { // FIXME: inelegant etc. - derogate to websocket code - WebSocketHandler::handleIncomingMessage(); - return; + return WebSocketHandler::handleIncomingMessage(); } auto socket = _socket.lock(); @@ -1447,7 +1446,7 @@ private: if (itBody == in.end()) { LOG_TRC("#" << socket->getFD() << " doesn't have enough data yet."); - return; + return SocketHandlerInterface::SocketOwnership::UNCHANGED; } // Skip the marker. @@ -1479,7 +1478,7 @@ private: if (request.getURI().find(NEW_CHILD_URI) != 0) { LOG_ERR("Invalid incoming URI."); - return; + return SocketHandlerInterface::SocketOwnership::UNCHANGED; } // New Child is spawned. @@ -1500,7 +1499,7 @@ private: if (pid <= 0) { LOG_ERR("Invalid PID in child URI [" << request.getURI() << "]."); - return; + return SocketHandlerInterface::SocketOwnership::UNCHANGED; } LOG_INF("New child [" << pid << "]."); @@ -1522,8 +1521,9 @@ private: { // Probably don't have enough data just yet. // TODO: timeout if we never get enough. - return; } + + return SocketHandlerInterface::SocketOwnership::UNCHANGED; } /// Prisoner websocket fun ... (for now) @@ -1586,9 +1586,10 @@ private: LOG_ERR("onDisconnect"); } - void handleIncomingMessage() override + SocketHandlerInterface::SocketOwnership handleIncomingMessage() override { LOG_ERR("handleIncomingMessage"); + return SocketHandlerInterface::SocketOwnership::UNCHANGED; } int getPollEvents(std::chrono::steady_clock::time_point /* now */, @@ -1626,7 +1627,7 @@ private: } /// Called after successful socket reads. - void handleIncomingMessage() override + SocketHandlerInterface::SocketOwnership handleIncomingMessage() override { auto socket = _socket.lock(); std::vector<char>& in = socket->_inBuffer; @@ -1638,7 +1639,7 @@ private: if (itBody == in.end()) { LOG_TRC("#" << socket->getFD() << " doesn't have enough data yet."); - return; + return SocketHandlerInterface::SocketOwnership::UNCHANGED; } // Skip the marker. @@ -1673,16 +1674,17 @@ private: if (contentLength != Poco::Net::HTTPMessage::UNKNOWN_CONTENT_LENGTH && available < contentLength) { LOG_DBG("Not enough content yet: ContentLength: " << contentLength << ", available: " << available); - return; + return SocketHandlerInterface::SocketOwnership::UNCHANGED; } } catch (const std::exception& exc) { // Probably don't have enough data just yet. // TODO: timeout if we never get enough. - return; + return SocketHandlerInterface::SocketOwnership::UNCHANGED; } + SocketHandlerInterface::SocketOwnership socketOwnership = SocketHandlerInterface::SocketOwnership::UNCHANGED; try { // Routing @@ -1732,7 +1734,7 @@ private: } else if (reqPathTokens.count() > 2 && reqPathTokens[0] == "lool" && reqPathTokens[2] == "ws") { - handleClientWsUpgrade(request, reqPathTokens[1]); + socketOwnership = handleClientWsUpgrade(request, reqPathTokens[1]); } else { @@ -1759,6 +1761,8 @@ private: // TODO: Send back failure. // NOTE: Check _wsState to choose between HTTP response or WebSocket (app-level) error. } + + return socketOwnership; } int getPollEvents(std::chrono::steady_clock::time_point /* now */, @@ -2097,7 +2101,7 @@ private: throw BadRequestException("Invalid or unknown request."); } - void handleClientWsUpgrade(const Poco::Net::HTTPRequest& request, const std::string& url) + SocketHandlerInterface::SocketOwnership handleClientWsUpgrade(const Poco::Net::HTTPRequest& request, const std::string& url) { // requestHandler = new ClientRequestHandler(); LOG_INF("Client WS request: " << request.getURI() << ", url: " << url); @@ -2109,7 +2113,7 @@ private: { LOG_ERR("Limit on maximum number of connections of " << MAX_CONNECTIONS << " reached."); shutdownLimitReached(ws); - return; + return SocketHandlerInterface::SocketOwnership::UNCHANGED; } LOG_INF("Starting GET request handler for session [" << _id << "] on url [" << url << "]."); @@ -2137,6 +2141,8 @@ private: LOG_INF("URL [" << url << "] is " << (isReadOnly ? "readonly" : "writable") << "."); + SocketHandlerInterface::SocketOwnership socketOwnership = SocketHandlerInterface::SocketOwnership::UNCHANGED; + // Request a kit process for this doc. auto docBroker = findOrCreateDocBroker(ws, url, docKey, _id, uriPublic); if (docBroker) @@ -2149,12 +2155,13 @@ private: auto socket = _socket.lock(); if (socket) { + // Set the ClientSession to handle Socket events. + socket->setHandler(clientSession); + // Move the socket into DocBroker. WebServerPoll.releaseSocket(socket); docBroker->addSocketToPoll(socket); - - // Set the ClientSession to handle Socket events. - socket->setHandler(clientSession); + socketOwnership = SocketHandlerInterface::SocketOwnership::MOVED; } docBroker->startThread(); } @@ -2163,6 +2170,8 @@ private: } else LOG_WRN("Failed to create DocBroker with docKey [" << docKey << "]."); + + return socketOwnership; } private: commit 6e7bd4bcf039a7ef86c56ccff566c7391ad07a4c Author: Ashod Nakashian <ashod.nakash...@collabora.co.uk> Date: Sat Mar 18 22:21:29 2017 -0400 wsd: cannot broadcast alert messages from random thread Will have to come up with a different solution to broadcasting alerts to all users. Change-Id: I00260402f71c516f4335c592b10dee7555dc67a6 diff --git a/wsd/LOOLWSD.cpp b/wsd/LOOLWSD.cpp index af5854f1..2e786a0d 100644 --- a/wsd/LOOLWSD.cpp +++ b/wsd/LOOLWSD.cpp @@ -215,10 +215,11 @@ void alertAllUsersInternal(const std::string& msg) LOG_INF("Alerting all users: [" << msg << "]"); - for (auto& brokerIt : DocBrokers) + //FIXME loolnb: due to thread-affinity of sockets we can't send from here. + // for (auto& brokerIt : DocBrokers) { - auto lock = brokerIt.second->getLock(); - brokerIt.second->alertAllUsers(msg); + // auto lock = brokerIt.second->getLock(); + // brokerIt.second->alertAllUsers(msg); } } } commit 7096133f07ddaac2c7c0895ac6f26e043cffdbb3 Author: Ashod Nakashian <ashod.nakash...@collabora.co.uk> Date: Sat Mar 18 22:19:41 2017 -0400 wsd: log thread affinity violations Change-Id: Ib1317bc71f9162f005e0ce9b8c715bbce656db73 diff --git a/common/Util.cpp b/common/Util.cpp index ce1730b0..16335fe3 100644 --- a/common/Util.cpp +++ b/common/Util.cpp @@ -264,6 +264,10 @@ namespace Util { LOG_SYS("Cannot set thread name to " << s << "."); } + else + { + LOG_INF("Thread " << std::hex << std::this_thread::get_id() << std::dec << " is now called " << s); + } } void getVersionInfo(std::string& version, std::string& hash) diff --git a/net/Socket.hpp b/net/Socket.hpp index 50e84a39..bcdb518d 100644 --- a/net/Socket.hpp +++ b/net/Socket.hpp @@ -193,7 +193,10 @@ public: virtual bool isCorrectThread(bool hard = false) { #if ENABLE_DEBUG - bool sameThread = std::this_thread::get_id() == _owner; + const bool sameThread = std::this_thread::get_id() == _owner; + if (!sameThread) + LOG_WRN("#" << _fd << " invoked from foreign thread. Expected: " << + std::hex << _owner << std::dec); if (hard) return sameThread; else commit b1609a0087fcf4ecd074cd21d8d35e05740e7f5c Author: Ashod Nakashian <ashod.nakash...@collabora.co.uk> Date: Sat Mar 18 16:36:32 2017 -0400 wsd: cleanup session and docBroker after convert-to Change-Id: I7d9c8eeef61c23cc3f4f902b15953abd5ec6851a diff --git a/wsd/ClientSession.cpp b/wsd/ClientSession.cpp index 83024cfd..08adefd0 100644 --- a/wsd/ClientSession.cpp +++ b/wsd/ClientSession.cpp @@ -581,7 +581,32 @@ bool ClientSession::handleKitToClientMessage(const char* buffer, const int lengt } } - setSaveAsUrl(url); + if (_saveAsSocket) + { + Poco::URI resultURL(url); + LOG_TRC("Save-as URL: " << resultURL.toString()); + + // TODO: Send back error when there is no output. + if (!resultURL.getPath().empty()) + { + const std::string mimeType = "application/octet-stream"; + std::string encodedFilePath; + Poco::URI::encode(resultURL.getPath(), "", encodedFilePath); + LOG_TRC("Sending file: " << encodedFilePath); + HttpHelper::sendFile(_saveAsSocket, encodedFilePath, mimeType); + } + + // Conversion is done, cleanup this fake session. + LOG_TRC("Removing save-as ClientSession after conversion."); + + // Remove us. + docBroker->removeSession(getId()); + + // Now terminate. + auto lock = docBroker->getLock(); + docBroker->terminateChild(lock, ""); + } + return true; } else if (tokens.size() == 2 && tokens[0] == "statechanged:") diff --git a/wsd/ClientSession.hpp b/wsd/ClientSession.hpp index 90558b72..664f30d5 100644 --- a/wsd/ClientSession.hpp +++ b/wsd/ClientSession.hpp @@ -89,26 +89,12 @@ public: _senderQueue.stop(); } + /// Set the save-as socket which is used to send convert-to results. void setSaveAsSocket(const std::shared_ptr<StreamSocket>& socket) { _saveAsSocket = socket; } - void setSaveAsUrl(const std::string& url) - { - Poco::URI resultURL(url); - LOG_TRC("Save-as URL: " << resultURL.toString()); - - if (!resultURL.getPath().empty()) - { - const std::string mimeType = "application/octet-stream"; - std::string encodedFilePath; - Poco::URI::encode(resultURL.getPath(), "", encodedFilePath); - LOG_TRC("Sending file: " << encodedFilePath); - HttpHelper::sendFile(_saveAsSocket, encodedFilePath, mimeType); - } - } - std::shared_ptr<DocumentBroker> getDocumentBroker() const { return _docBroker.lock(); } /// Exact URI (including query params - access tokens etc.) with which diff --git a/wsd/DocumentBroker.cpp b/wsd/DocumentBroker.cpp index 10a89233..e1362ec6 100644 --- a/wsd/DocumentBroker.cpp +++ b/wsd/DocumentBroker.cpp @@ -1191,7 +1191,7 @@ bool DocumentBroker::forwardToClient(const std::shared_ptr<Message>& payload) const auto& data = payload->data().data(); const auto& size = payload->size(); - std::unique_lock<std::mutex> lock(_mutex); + // std::unique_lock<std::mutex> lock(_mutex); if (sid == "all") { diff --git a/wsd/LOOLWSD.cpp b/wsd/LOOLWSD.cpp index 55e52ccd..af5854f1 100644 --- a/wsd/LOOLWSD.cpp +++ b/wsd/LOOLWSD.cpp @@ -1539,7 +1539,8 @@ private: { // We should never destroy the broker, since // it owns us and will wait on this thread. - // FIXME: loolnb - check that comment ! + // This is true with non-blocking since this is + // called from DocumentBroker::pollThread. assert(docBroker.use_count() > 1); docBroker->handleInput(data); return; _______________________________________________ Libreoffice-commits mailing list libreoffice-comm...@lists.freedesktop.org https://lists.freedesktop.org/mailman/listinfo/libreoffice-commits