common/Protocol.hpp | 74 ++++++++++++++++++++-- test/TileQueueTests.cpp | 155 ++++++++++++++++++++++++++++++++++++++++++++++ test/WhiteBoxTests.cpp | 101 ++++++++++++++++++++++++++++++ wsd/ClientSession.hpp | 8 -- wsd/DocumentBroker.cpp | 4 - wsd/SenderQueue.hpp | 159 ++++++++++++++++++++++++++++++++++++++++++++++-- wsd/TileCache.cpp | 25 +++---- wsd/TileDesc.hpp | 13 +++ 8 files changed, 503 insertions(+), 36 deletions(-)
New commits: commit 8b59e78ad0b44a8f98ce18c8529e00227d0b357c Author: Ashod Nakashian <ashod.nakash...@collabora.co.uk> Date: Sat Dec 17 14:28:11 2016 -0500 wsd: message deduplication in SenderQueue Squashed multiple commits from master. Change-Id: I0ad9ef7bf826b3fd0eba9cc17ec5212a3334a2f5 Reviewed-on: https://gerrit.libreoffice.org/32164 Reviewed-by: Michael Meeks <michael.me...@collabora.com> Tested-by: Michael Meeks <michael.me...@collabora.com> diff --git a/common/Protocol.hpp b/common/Protocol.hpp index e95090d..67f7ec7 100644 --- a/common/Protocol.hpp +++ b/common/Protocol.hpp @@ -77,16 +77,65 @@ namespace LOOLProtocol // Functions that parse messages. All return false if parsing fails bool parseStatus(const std::string& message, LibreOfficeKitDocumentType& type, int& nParts, int& currentPart, int& width, int& height); + /// Tokenize space-delimited values until we hit new-line or the end. inline - std::string getDelimitedInitialSubstring(const char *message, const int length, const char delim) + std::vector<std::string> tokenize(const char* data, const size_t size) { - if (message == nullptr || length <= 0) + std::vector<std::string> tokens; + if (size == 0 || data == nullptr) + { + return tokens; + } + + const char* start = data; + const char* end = data; + for (size_t i = 0; i < size && data[i] != '\n'; ++i, ++end) + { + if (data[i] == ' ') + { + if (start != end && *start != ' ') + { + tokens.emplace_back(start, end); + } + + start = end; + } + else if (*start == ' ') + { + ++start; + } + } + + if (start != end && *start != ' ' && *start != '\n') { - return ""; + tokens.emplace_back(start, end); } - const char *founddelim = static_cast<const char *>(std::memchr(message, delim, length)); - const auto size = (founddelim == nullptr ? length : founddelim - message); + return tokens; + } + + inline + std::vector<std::string> tokenize(const std::string& s) + { + return tokenize(s.data(), s.size()); + } + + inline size_t getDelimiterPosition(const char* message, const int length, const char delim) + { + if (message && length > 0) + { + const char *founddelim = static_cast<const char *>(std::memchr(message, delim, length)); + const auto size = (founddelim == nullptr ? length : founddelim - message); + return size; + } + + return 0; + } + + inline + std::string getDelimitedInitialSubstring(const char *message, const int length, const char delim) + { + const auto size = getDelimiterPosition(message, length, delim); return std::string(message, size); } @@ -170,7 +219,7 @@ namespace LOOLProtocol { if (message == nullptr || length <= 0) { - return ""; + return std::string(); } const auto firstLine = getFirstLine(message, std::min(length, 120)); @@ -184,6 +233,19 @@ namespace LOOLProtocol return firstLine; } + inline std::string getAbbreviatedMessage(const std::string& message) + { + const auto pos = getDelimiterPosition(message.data(), std::min(message.size(), 120UL), '\n'); + + // If first line is less than the length (minus newline), add ellipsis. + if (pos < static_cast<std::string::size_type>(message.size()) - 1) + { + return message.substr(0, pos) + "..."; + } + + return message; + } + template <typename T> std::string getAbbreviatedMessage(const T& message) { diff --git a/test/TileQueueTests.cpp b/test/TileQueueTests.cpp index dea692b..4587cc4 100644 --- a/test/TileQueueTests.cpp +++ b/test/TileQueueTests.cpp @@ -14,6 +14,7 @@ #include "Common.hpp" #include "Protocol.hpp" #include "MessageQueue.hpp" +#include "SenderQueue.hpp" #include "Util.hpp" namespace CPPUNIT_NS @@ -44,6 +45,9 @@ class TileQueueTests : public CPPUNIT_NS::TestFixture CPPUNIT_TEST(testTileRecombining); CPPUNIT_TEST(testViewOrder); CPPUNIT_TEST(testPreviewsDeprioritization); + CPPUNIT_TEST(testSenderQueue); + CPPUNIT_TEST(testSenderQueueTileDeduplication); + CPPUNIT_TEST(testInvalidateViewCursorDeduplication); CPPUNIT_TEST_SUITE_END(); @@ -52,6 +56,9 @@ class TileQueueTests : public CPPUNIT_NS::TestFixture void testTileRecombining(); void testViewOrder(); void testPreviewsDeprioritization(); + void testSenderQueue(); + void testSenderQueueTileDeduplication(); + void testInvalidateViewCursorDeduplication(); }; void TileQueueTests::testTileQueuePriority() @@ -259,6 +266,154 @@ void TileQueueTests::testPreviewsDeprioritization() CPPUNIT_ASSERT_EQUAL(0, static_cast<int>(queue._queue.size())); } +void TileQueueTests::testSenderQueue() +{ + SenderQueue<std::shared_ptr<MessagePayload>> queue; + + std::shared_ptr<MessagePayload> item; + + // Empty queue + CPPUNIT_ASSERT_EQUAL(false, queue.waitDequeue(item, 10)); + CPPUNIT_ASSERT_EQUAL(0UL, queue.size()); + + const std::vector<std::string> messages = + { + "message 1", + "message 2", + "message 3" + }; + + for (const auto& msg : messages) + { + queue.enqueue(std::make_shared<MessagePayload>(msg)); + } + + CPPUNIT_ASSERT_EQUAL(3UL, queue.size()); + + CPPUNIT_ASSERT_EQUAL(true, queue.waitDequeue(item, 0)); + CPPUNIT_ASSERT_EQUAL(2UL, queue.size()); + CPPUNIT_ASSERT_EQUAL(messages[0], std::string(item->data().data(), item->data().size())); + + CPPUNIT_ASSERT_EQUAL(true, queue.waitDequeue(item, 0)); + CPPUNIT_ASSERT_EQUAL(1UL, queue.size()); + CPPUNIT_ASSERT_EQUAL(messages[1], std::string(item->data().data(), item->data().size())); + + CPPUNIT_ASSERT_EQUAL(true, queue.waitDequeue(item, 0)); + CPPUNIT_ASSERT_EQUAL(0UL, queue.size()); + CPPUNIT_ASSERT_EQUAL(messages[2], std::string(item->data().data(), item->data().size())); + + CPPUNIT_ASSERT_EQUAL(0UL, queue.size()); +} + +void TileQueueTests::testSenderQueueTileDeduplication() +{ + SenderQueue<std::shared_ptr<MessagePayload>> queue; + + std::shared_ptr<MessagePayload> item; + + // Empty queue + CPPUNIT_ASSERT_EQUAL(false, queue.waitDequeue(item, 10)); + CPPUNIT_ASSERT_EQUAL(0UL, queue.size()); + + const std::vector<std::string> part_messages = + { + "tile: part=0 width=180 height=135 tileposx=0 tileposy=0 tilewidth=15875 tileheight=11906 ver=0", + "tile: part=1 width=180 height=135 tileposx=0 tileposy=0 tilewidth=15875 tileheight=11906 ver=1", + "tile: part=2 width=180 height=135 tileposx=0 tileposy=0 tilewidth=15875 tileheight=11906 ver=-1" + }; + + for (const auto& msg : part_messages) + { + queue.enqueue(std::make_shared<MessagePayload>(msg)); + } + + CPPUNIT_ASSERT_EQUAL(3UL, queue.size()); + CPPUNIT_ASSERT_EQUAL(true, queue.waitDequeue(item, 10)); + CPPUNIT_ASSERT_EQUAL(true, queue.waitDequeue(item, 10)); + CPPUNIT_ASSERT_EQUAL(true, queue.waitDequeue(item, 10)); + + CPPUNIT_ASSERT_EQUAL(0UL, queue.size()); + + const std::vector<std::string> dup_messages = + { + "tile: part=0 width=180 height=135 tileposx=0 tileposy=0 tilewidth=15875 tileheight=11906 ver=-1", + "tile: part=0 width=180 height=135 tileposx=0 tileposy=0 tilewidth=15875 tileheight=11906 ver=1", + "tile: part=0 width=180 height=135 tileposx=0 tileposy=0 tilewidth=15875 tileheight=11906 ver=1" + }; + + for (const auto& msg : dup_messages) + { + queue.enqueue(std::make_shared<MessagePayload>(msg)); + } + + CPPUNIT_ASSERT_EQUAL(1UL, queue.size()); + CPPUNIT_ASSERT_EQUAL(true, queue.waitDequeue(item, 10)); + + // The last one should persist. + CPPUNIT_ASSERT_EQUAL(dup_messages[2], std::string(item->data().data(), item->data().size())); + + CPPUNIT_ASSERT_EQUAL(0UL, queue.size()); +} + +void TileQueueTests::testInvalidateViewCursorDeduplication() +{ + SenderQueue<std::shared_ptr<MessagePayload>> queue; + + std::shared_ptr<MessagePayload> item; + + // Empty queue + CPPUNIT_ASSERT_EQUAL(false, queue.waitDequeue(item, 10)); + CPPUNIT_ASSERT_EQUAL(0UL, queue.size()); + + const std::vector<std::string> view_messages = + { + "invalidateviewcursor: { \"viewId\": \"1\", \"rectangle\": \"3999, 1418, 0, 298\", \"part\": \"0\" }", + "invalidateviewcursor: { \"viewId\": \"2\", \"rectangle\": \"3999, 1418, 0, 298\", \"part\": \"0\" }", + "invalidateviewcursor: { \"viewId\": \"3\", \"rectangle\": \"3999, 1418, 0, 298\", \"part\": \"0\" }", + }; + + for (const auto& msg : view_messages) + { + queue.enqueue(std::make_shared<MessagePayload>(msg)); + } + + CPPUNIT_ASSERT_EQUAL(3UL, queue.size()); + + CPPUNIT_ASSERT_EQUAL(true, queue.waitDequeue(item, 0)); + CPPUNIT_ASSERT_EQUAL(2UL, queue.size()); + CPPUNIT_ASSERT_EQUAL(view_messages[0], std::string(item->data().data(), item->data().size())); + + CPPUNIT_ASSERT_EQUAL(true, queue.waitDequeue(item, 0)); + CPPUNIT_ASSERT_EQUAL(1UL, queue.size()); + CPPUNIT_ASSERT_EQUAL(view_messages[1], std::string(item->data().data(), item->data().size())); + + CPPUNIT_ASSERT_EQUAL(true, queue.waitDequeue(item, 0)); + CPPUNIT_ASSERT_EQUAL(0UL, queue.size()); + CPPUNIT_ASSERT_EQUAL(view_messages[2], std::string(item->data().data(), item->data().size())); + + CPPUNIT_ASSERT_EQUAL(0UL, queue.size()); + + const std::vector<std::string> dup_messages = + { + "invalidateviewcursor: { \"viewId\": \"1\", \"rectangle\": \"3999, 1418, 0, 298\", \"part\": \"0\" }", + "invalidateviewcursor: { \"viewId\": \"1\", \"rectangle\": \"1000, 1418, 0, 298\", \"part\": \"0\" }", + "invalidateviewcursor: { \"viewId\": \"1\", \"rectangle\": \"2000, 1418, 0, 298\", \"part\": \"0\" }", + }; + + for (const auto& msg : dup_messages) + { + queue.enqueue(std::make_shared<MessagePayload>(msg)); + } + + CPPUNIT_ASSERT_EQUAL(1UL, queue.size()); + CPPUNIT_ASSERT_EQUAL(true, queue.waitDequeue(item, 0)); + + // The last one should persist. + CPPUNIT_ASSERT_EQUAL(dup_messages[2], std::string(item->data().data(), item->data().size())); + + CPPUNIT_ASSERT_EQUAL(0UL, queue.size()); +} + CPPUNIT_TEST_SUITE_REGISTRATION(TileQueueTests); /* vim:set shiftwidth=4 softtabstop=4 expandtab: */ diff --git a/test/WhiteBoxTests.cpp b/test/WhiteBoxTests.cpp index 6ca75bc..75b9ec6 100644 --- a/test/WhiteBoxTests.cpp +++ b/test/WhiteBoxTests.cpp @@ -24,6 +24,8 @@ class WhiteBoxTests : public CPPUNIT_NS::TestFixture CPPUNIT_TEST_SUITE(WhiteBoxTests); CPPUNIT_TEST(testLOOLProtocolFunctions); + CPPUNIT_TEST(testMessageAbbreviation); + CPPUNIT_TEST(testTokenizer); CPPUNIT_TEST(testRegexListMatcher); CPPUNIT_TEST(testRegexListMatcher_Init); CPPUNIT_TEST(testEmptyCellCursor); @@ -31,6 +33,8 @@ class WhiteBoxTests : public CPPUNIT_NS::TestFixture CPPUNIT_TEST_SUITE_END(); void testLOOLProtocolFunctions(); + void testMessageAbbreviation(); + void testTokenizer(); void testRegexListMatcher(); void testRegexListMatcher_Init(); void testEmptyCellCursor(); @@ -75,6 +79,103 @@ void WhiteBoxTests::testLOOLProtocolFunctions() } +void WhiteBoxTests::testMessageAbbreviation() +{ + CPPUNIT_ASSERT_EQUAL(std::string(), LOOLProtocol::getDelimitedInitialSubstring(nullptr, 5, '\n')); + CPPUNIT_ASSERT_EQUAL(std::string(), LOOLProtocol::getDelimitedInitialSubstring(nullptr, -1, '\n')); + CPPUNIT_ASSERT_EQUAL(std::string(), LOOLProtocol::getDelimitedInitialSubstring("abc", 0, '\n')); + CPPUNIT_ASSERT_EQUAL(std::string(), LOOLProtocol::getDelimitedInitialSubstring("abc", -1, '\n')); + CPPUNIT_ASSERT_EQUAL(std::string("ab"), LOOLProtocol::getDelimitedInitialSubstring("abc", 2, '\n')); + + CPPUNIT_ASSERT_EQUAL(std::string(), LOOLProtocol::getAbbreviatedMessage(nullptr, 5)); + CPPUNIT_ASSERT_EQUAL(std::string(), LOOLProtocol::getAbbreviatedMessage(nullptr, -1)); + CPPUNIT_ASSERT_EQUAL(std::string(), LOOLProtocol::getAbbreviatedMessage("abc", 0)); + CPPUNIT_ASSERT_EQUAL(std::string(), LOOLProtocol::getAbbreviatedMessage("abc", -1)); + CPPUNIT_ASSERT_EQUAL(std::string("ab"), LOOLProtocol::getAbbreviatedMessage("abc", 2)); + + std::string s; + std::string abbr; + + s = "abcdefg"; + CPPUNIT_ASSERT_EQUAL(s, LOOLProtocol::getAbbreviatedMessage(s)); + + s = "1234567890123\n45678901234567890123456789012345678901234567890123"; + abbr = "1234567890123..."; + CPPUNIT_ASSERT_EQUAL(abbr, LOOLProtocol::getAbbreviatedMessage(s.data(), s.size())); + CPPUNIT_ASSERT_EQUAL(abbr, LOOLProtocol::getAbbreviatedMessage(s)); + + // 120 characters. Change when the abbreviation max-length changes. + s = "123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890"; + CPPUNIT_ASSERT_EQUAL(s, LOOLProtocol::getAbbreviatedMessage(s.data(), s.size())); + CPPUNIT_ASSERT_EQUAL(s, LOOLProtocol::getAbbreviatedMessage(s)); + + abbr = s + "..."; + s += "more data"; + CPPUNIT_ASSERT_EQUAL(abbr, LOOLProtocol::getAbbreviatedMessage(s.data(), s.size())); + CPPUNIT_ASSERT_EQUAL(abbr, LOOLProtocol::getAbbreviatedMessage(s)); +} + +void WhiteBoxTests::testTokenizer() +{ + std::vector<std::string> tokens; + + tokens = LOOLProtocol::tokenize(""); + CPPUNIT_ASSERT_EQUAL(0UL, tokens.size()); + + tokens = LOOLProtocol::tokenize(" "); + CPPUNIT_ASSERT_EQUAL(0UL, tokens.size()); + + tokens = LOOLProtocol::tokenize("A"); + CPPUNIT_ASSERT_EQUAL(1UL, tokens.size()); + CPPUNIT_ASSERT_EQUAL(std::string("A"), tokens[0]); + + tokens = LOOLProtocol::tokenize(" A"); + CPPUNIT_ASSERT_EQUAL(1UL, tokens.size()); + CPPUNIT_ASSERT_EQUAL(std::string("A"), tokens[0]); + + tokens = LOOLProtocol::tokenize("A "); + CPPUNIT_ASSERT_EQUAL(1UL, tokens.size()); + CPPUNIT_ASSERT_EQUAL(std::string("A"), tokens[0]); + + tokens = LOOLProtocol::tokenize(" A "); + CPPUNIT_ASSERT_EQUAL(1UL, tokens.size()); + CPPUNIT_ASSERT_EQUAL(std::string("A"), tokens[0]); + + tokens = LOOLProtocol::tokenize(" A Z "); + CPPUNIT_ASSERT_EQUAL(2UL, tokens.size()); + CPPUNIT_ASSERT_EQUAL(std::string("A"), tokens[0]); + CPPUNIT_ASSERT_EQUAL(std::string("Z"), tokens[1]); + + tokens = LOOLProtocol::tokenize("\n"); + CPPUNIT_ASSERT_EQUAL(0UL, tokens.size()); + + tokens = LOOLProtocol::tokenize(" A \nZ "); + CPPUNIT_ASSERT_EQUAL(1UL, tokens.size()); + CPPUNIT_ASSERT_EQUAL(std::string("A"), tokens[0]); + + tokens = LOOLProtocol::tokenize(" A Z\n "); + CPPUNIT_ASSERT_EQUAL(2UL, tokens.size()); + CPPUNIT_ASSERT_EQUAL(std::string("A"), tokens[0]); + CPPUNIT_ASSERT_EQUAL(std::string("Z"), tokens[1]); + + tokens = LOOLProtocol::tokenize(" A Z \n "); + CPPUNIT_ASSERT_EQUAL(2UL, tokens.size()); + CPPUNIT_ASSERT_EQUAL(std::string("A"), tokens[0]); + CPPUNIT_ASSERT_EQUAL(std::string("Z"), tokens[1]); + + tokens = LOOLProtocol::tokenize("tile part=0 width=256 height=256 tileposx=0 tileposy=0 tilewidth=3840 tileheight=3840 ver=-1"); + CPPUNIT_ASSERT_EQUAL(9UL, tokens.size()); + CPPUNIT_ASSERT_EQUAL(std::string("tile"), tokens[0]); + CPPUNIT_ASSERT_EQUAL(std::string("part=0"), tokens[1]); + CPPUNIT_ASSERT_EQUAL(std::string("width=256"), tokens[2]); + CPPUNIT_ASSERT_EQUAL(std::string("height=256"), tokens[3]); + CPPUNIT_ASSERT_EQUAL(std::string("tileposx=0"), tokens[4]); + CPPUNIT_ASSERT_EQUAL(std::string("tileposy=0"), tokens[5]); + CPPUNIT_ASSERT_EQUAL(std::string("tilewidth=3840"), tokens[6]); + CPPUNIT_ASSERT_EQUAL(std::string("tileheight=3840"), tokens[7]); + CPPUNIT_ASSERT_EQUAL(std::string("ver=-1"), tokens[8]); +} + void WhiteBoxTests::testRegexListMatcher() { Util::RegexListMatcher matcher; diff --git a/wsd/ClientSession.hpp b/wsd/ClientSession.hpp index bb94bff..ae233e9 100644 --- a/wsd/ClientSession.hpp +++ b/wsd/ClientSession.hpp @@ -48,18 +48,14 @@ public: bool sendBinaryFrame(const char* buffer, int length) override { - auto payload = std::make_shared<MessagePayload>(length, MessagePayload::Type::Binary); - auto& output = payload->data(); - std::memcpy(output.data(), buffer, length); + auto payload = std::make_shared<MessagePayload>(buffer, length, MessagePayload::Type::Binary); enqueueSendMessage(payload); return true; } bool sendTextFrame(const char* buffer, const int length) override { - auto payload = std::make_shared<MessagePayload>(length, MessagePayload::Type::Text); - auto& output = payload->data(); - std::memcpy(output.data(), buffer, length); + auto payload = std::make_shared<MessagePayload>(buffer, length, MessagePayload::Type::Text); enqueueSendMessage(payload); return true; } diff --git a/wsd/DocumentBroker.cpp b/wsd/DocumentBroker.cpp index 513e12b..6b061ea 100644 --- a/wsd/DocumentBroker.cpp +++ b/wsd/DocumentBroker.cpp @@ -617,9 +617,7 @@ void DocumentBroker::alertAllUsers(const std::string& msg) { Util::assertIsLocked(_mutex); - auto payload = std::make_shared<MessagePayload>(msg.size(), MessagePayload::Type::Text); - auto& output = payload->data(); - std::memcpy(output.data(), msg.data(), msg.size()); + auto payload = std::make_shared<MessagePayload>(msg); LOG_DBG("Alerting all users of [" << _docKey << "]: " << msg); for (auto& it : _sessions) diff --git a/wsd/SenderQueue.hpp b/wsd/SenderQueue.hpp index b1ded81..44742ab 100644 --- a/wsd/SenderQueue.hpp +++ b/wsd/SenderQueue.hpp @@ -16,31 +16,101 @@ #include <mutex> #include <vector> +#include <Poco/Dynamic/Var.h> +#include <Poco/JSON/JSON.h> +#include <Poco/JSON/Object.h> +#include <Poco/JSON/Parser.h> + #include "common/SigUtil.hpp" #include "LOOLWebSocket.hpp" #include "Log.hpp" +#include "TileDesc.hpp" /// The payload type used to send/receive data. class MessagePayload { public: - enum class Type { Text, Binary }; + enum class Type { Text, JSON, Binary }; - MessagePayload(const size_t size, enum Type type) : - _data(size), + /// Construct a text message. + /// message must include the full first-line. + MessagePayload(const std::string& message, + const enum Type type = Type::Text) : + _data(message.data(), message.data() + message.size()), + _tokens(LOOLProtocol::tokenize(_data.data(), _data.size())), + _firstLine(LOOLProtocol::getFirstLine(_data.data(), _data.size())), + _abbreviation(LOOLProtocol::getAbbreviatedMessage(_data.data(), _data.size())), _type(type) { } - std::vector<char>& data() { return _data; } + /// Construct a message from a string with type and + /// reserve extra space (total, including message). + /// message must include the full first-line. + MessagePayload(const std::string& message, + const enum Type type, + const size_t reserve) : + _data(std::max(reserve, message.size())), + _tokens(LOOLProtocol::tokenize(_data.data(), _data.size())), + _firstLine(LOOLProtocol::getFirstLine(_data.data(), _data.size())), + _abbreviation(LOOLProtocol::getAbbreviatedMessage(_data.data(), _data.size())), + _type(type) + { + _data.resize(message.size()); + std::memcpy(_data.data(), message.data(), message.size()); + } + + /// Construct a message from a character array with type. + /// data must be include the full first-line. + MessagePayload(const char* data, + const size_t size, + const enum Type type) : + _data(data, data + size), + _tokens(LOOLProtocol::tokenize(_data.data(), _data.size())), + _firstLine(LOOLProtocol::getFirstLine(_data.data(), _data.size())), + _abbreviation(LOOLProtocol::getAbbreviatedMessage(_data.data(), _data.size())), + _type(type) + { + } + + size_t size() const { return _data.size(); } + const std::vector<char>& data() const { return _data; } + + const std::vector<std::string>& tokens() const { return _tokens; } + const std::string& firstToken() const { return _tokens[0]; } + const std::string& firstLine() const { return _firstLine; } + const std::string& abbreviation() const { return _abbreviation; } + + /// Returns the json part of the message, if any. + std::string jsonString() const + { + if (_tokens.size() > 1 && _tokens[1] == "{") + { + const auto firstTokenSize = _tokens[0].size(); + return std::string(_data.data() + firstTokenSize, _data.size() - firstTokenSize); + } + + return std::string(); + } + + /// Append more data to the message. + void append(const char* data, const size_t size) + { + const auto curSize = _data.size(); + _data.resize(curSize + size); + std::memcpy(_data.data() + curSize, data, size); + } /// Returns true if and only if the payload is considered Binary. bool isBinary() const { return _type == Type::Binary; } private: std::vector<char> _data; - Type _type; + const std::vector<std::string> _tokens; + const std::string _firstLine; + const std::string _abbreviation; + const Type _type; }; struct SendItem @@ -74,7 +144,10 @@ public: std::unique_lock<std::mutex> lock(_mutex); if (!stopping()) { - _queue.push_back(item); + if (deduplicate(item)) + { + _queue.push_back(item); + } } const size_t queuesize = _queue.size(); @@ -115,9 +188,83 @@ public: } private: + /// Deduplicate messages based on the new one. + /// Returns true if the new message should be + /// enqueued, otherwise false. + bool deduplicate(const Item& item) + { + // Deduplicate messages based on the incoming one. + const std::string command = item->firstToken(); + if (command == "tile:") + { + // Remove previous identical tile, if any, and use most recent (incoming). + const TileDesc newTile = TileDesc::parse(item->firstLine()); + const auto& pos = std::find_if(_queue.begin(), _queue.end(), + [&newTile](const queue_item_t& cur) + { + return (cur->firstToken() == "tile:" && + newTile == TileDesc::parse(cur->firstLine())); + }); + + if (pos != _queue.end()) + { + _queue.erase(pos); + } + } + else if (command == "statusindicatorsetvalue:" || + command == "invalidatecursor:") + { + // Remove previous identical enties of this command, + // if any, and use most recent (incoming). + const auto& pos = std::find_if(_queue.begin(), _queue.end(), + [&command](const queue_item_t& cur) + { + return (cur->firstToken() == command); + }); + + if (pos != _queue.end()) + { + _queue.erase(pos); + } + } + else if (command == "invalidateviewcursor:") + { + // Remove previous cursor invalidation for same view, + // if any, and use most recent (incoming). + const std::string newMsg = item->jsonString(); + Poco::JSON::Parser newParser; + const auto newResult = newParser.parse(newMsg); + const auto& newJson = newResult.extract<Poco::JSON::Object::Ptr>(); + const auto viewId = newJson->get("viewId").toString(); + const auto& pos = std::find_if(_queue.begin(), _queue.end(), + [command, viewId](const queue_item_t& cur) + { + if (cur->firstToken() == command) + { + const std::string msg = cur->jsonString(); + Poco::JSON::Parser parser; + const auto result = parser.parse(msg); + const auto& json = result.extract<Poco::JSON::Object::Ptr>(); + return (viewId == json->get("viewId").toString()); + } + + return false; + }); + + if (pos != _queue.end()) + { + _queue.erase(pos); + } + } + + return true; + } + +private: mutable std::mutex _mutex; std::condition_variable _cv; std::deque<Item> _queue; + typedef typename std::deque<Item>::value_type queue_item_t; std::atomic<bool> _stop; }; diff --git a/wsd/TileCache.cpp b/wsd/TileCache.cpp index 7a536e1..9bac2a6 100644 --- a/wsd/TileCache.cpp +++ b/wsd/TileCache.cpp @@ -173,16 +173,12 @@ void TileCache::saveTileAndNotify(const TileDesc& tile, const char *data, const std::string response = tile.serialize("tile:"); LOG_DBG("Sending tile message to " << subscriberCount << " subscribers: " + response); - std::shared_ptr<MessagePayload> payload = std::make_shared<MessagePayload>(response.size() + 1 + size, - MessagePayload::Type::Binary); - { - auto& output = payload->data(); - - // Send to first subscriber as-is (without cache marker). - std::memcpy(output.data(), response.data(), response.size()); - output[response.size()] = '\n'; - std::memcpy(output.data() + response.size() + 1, data, size); - } + // Send to first subscriber as-is (without cache marker). + auto payload = std::make_shared<MessagePayload>(response, + MessagePayload::Type::Binary, + response.size() + 1 + size); + payload->append("\n", 1); + payload->append(data, size); auto& firstSubscriber = tileBeingRendered->_subscribers[0]; auto firstSession = firstSubscriber.lock(); @@ -198,11 +194,10 @@ void TileCache::saveTileAndNotify(const TileDesc& tile, const char *data, const // Create a new Payload. payload.reset(); - payload = std::make_shared<MessagePayload>(response.size() + size, MessagePayload::Type::Binary); - auto& output = payload->data(); - - std::memcpy(output.data(), response.data(), response.size()); - std::memcpy(output.data() + response.size(), data, size); + payload = std::make_shared<MessagePayload>(response, + MessagePayload::Type::Binary, + response.size() + size); + payload->append(data, size); for (size_t i = 1; i < subscriberCount; ++i) { diff --git a/wsd/TileDesc.hpp b/wsd/TileDesc.hpp index 765341d..9721deb 100644 --- a/wsd/TileDesc.hpp +++ b/wsd/TileDesc.hpp @@ -65,6 +65,19 @@ public: int getId() const { return _id; } bool getBroadcast() const { return _broadcast; } + bool operator==(const TileDesc& other) const + { + return _part == other._part && + _width == other._width && + _height == other._height && + _tilePosX == other._tilePosX && + _tilePosY == other._tilePosY && + _tileWidth == other._tileWidth && + _tileHeight == other._tileHeight && + _id == other._id && + _broadcast == other._broadcast; + } + bool intersectsWithRect(int x, int y, int w, int h) const { return x + w >= getTilePosX() && _______________________________________________ Libreoffice-commits mailing list libreoffice-comm...@lists.freedesktop.org https://lists.freedesktop.org/mailman/listinfo/libreoffice-commits