This is an automated email from the ASF dual-hosted git repository.

swebb2066 pushed a commit to branch simplify_socket_appender
in repository https://gitbox.apache.org/repos/asf/logging-log4cxx.git

commit 224e47cfb46a2018e5b087ad042162bfb8dc292b
Author: Stephen Webb <[email protected]>
AuthorDate: Wed Jan 15 14:43:09 2025 +1100

    Reduce SocketAppender overhead
---
 src/main/cpp/aprsocket.cpp                         |   5 +
 src/main/cpp/socketappenderskeleton.cpp            |  98 ++++++++-----
 src/main/cpp/threadutility.cpp                     |  41 +++---
 .../include/log4cxx/net/socketappenderskeleton.h   |   2 +-
 src/main/include/log4cxx/private/aprsocket.h       |   2 +
 .../log4cxx/private/socketappenderskeleton_priv.h  |  29 ++--
 src/test/cpp/net/CMakeLists.txt                    |   1 +
 src/test/cpp/net/socketappendertestcase.cpp        | 154 ++++++++++++++-------
 8 files changed, 216 insertions(+), 116 deletions(-)

diff --git a/src/main/cpp/aprsocket.cpp b/src/main/cpp/aprsocket.cpp
index c879b513..2ed76bda 100644
--- a/src/main/cpp/aprsocket.cpp
+++ b/src/main/cpp/aprsocket.cpp
@@ -159,5 +159,10 @@ void APRSocket::close()
        }
 }
 
+apr_socket_t* APRSocket::getSocketPtr() const
+{
+       return _priv->socket;
+}
+
 } //namespace helpers
 } //namespace log4cxx
diff --git a/src/main/cpp/socketappenderskeleton.cpp 
b/src/main/cpp/socketappenderskeleton.cpp
index 993294bb..fdba18a8 100644
--- a/src/main/cpp/socketappenderskeleton.cpp
+++ b/src/main/cpp/socketappenderskeleton.cpp
@@ -21,6 +21,7 @@
 #include <log4cxx/helpers/optionconverter.h>
 #include <log4cxx/helpers/stringhelper.h>
 #include <log4cxx/spi/loggingevent.h>
+#include <log4cxx/helpers/threadutility.h>
 #include <log4cxx/helpers/transcoder.h>
 #include <log4cxx/helpers/bytearrayoutputstream.h>
 #include <log4cxx/helpers/threadutility.h>
@@ -36,17 +37,17 @@ using namespace LOG4CXX_NS::net;
 #define _priv static_cast<SocketAppenderSkeletonPriv*>(m_priv.get())
 
 SocketAppenderSkeleton::SocketAppenderSkeleton(int defaultPort, int 
reconnectionDelay)
-    : 
AppenderSkeleton(std::make_unique<SocketAppenderSkeletonPriv>(defaultPort, 
reconnectionDelay))
+       : 
AppenderSkeleton(std::make_unique<SocketAppenderSkeletonPriv>(defaultPort, 
reconnectionDelay))
 {
 }
 
 SocketAppenderSkeleton::SocketAppenderSkeleton(helpers::InetAddressPtr 
address, int port, int reconnectionDelay)
-    : AppenderSkeleton(std::make_unique<SocketAppenderSkeletonPriv>(address, 
port, reconnectionDelay))
+       : 
AppenderSkeleton(std::make_unique<SocketAppenderSkeletonPriv>(address, port, 
reconnectionDelay))
 {
 }
 
 SocketAppenderSkeleton::SocketAppenderSkeleton(const LogString& host, int 
port, int reconnectionDelay)
-    : AppenderSkeleton(std::make_unique<SocketAppenderSkeletonPriv>(host, 
port, reconnectionDelay))
+       : AppenderSkeleton(std::make_unique<SocketAppenderSkeletonPriv>(host, 
port, reconnectionDelay))
 {
 }
 
@@ -68,8 +69,8 @@ void SocketAppenderSkeleton::activateOptions(Pool& p)
 
 void SocketAppenderSkeleton::close()
 {
-    _priv->stopMonitor();
-    cleanUp(_priv->pool);
+       _priv->stopMonitor();
+       cleanUp(_priv->pool);
 }
 
 void SocketAppenderSkeleton::connect(Pool& p)
@@ -136,22 +137,47 @@ void SocketAppenderSkeleton::setOption(const LogString& 
option, const LogString&
 void SocketAppenderSkeleton::fireConnector()
 {
        std::lock_guard<std::recursive_mutex> lock(_priv->mutex);
-
-       if ( !_priv->thread.joinable() )
-       {
-               LogLog::debug(LOG4CXX_STR("Connector thread not alive: starting 
monitor."));
-
-               _priv->thread = ThreadUtility::instance()->createThread( 
LOG4CXX_STR("SocketAppend"), &SocketAppenderSkeleton::monitor, this );
-       }
+    if (_priv->taskName.empty())
+    {
+        Pool p;
+        _priv->taskName = _priv->name + LOG4CXX_STR(":")
+            + _priv->address->toString() + LOG4CXX_STR(":");
+        StringHelper::toString(_priv->port, p, _priv->taskName);
+    }
+    auto taskManager = ThreadUtility::instancePtr();
+    if (!taskManager->value().hasPeriodicTask(_priv->taskName))
+    {
+        Pool p;
+        if (LogLog::isDebugEnabled())
+        {
+            Pool p;
+            LogString msg(LOG4CXX_STR("Waiting "));
+            StringHelper::toString(_priv->reconnectionDelay, p, msg);
+            msg += LOG4CXX_STR(" ms before retrying [")
+                + _priv->address->toString() + LOG4CXX_STR(":");
+            StringHelper::toString(_priv->port, p, msg);
+            msg += LOG4CXX_STR("].");
+            LogLog::debug(msg);
+        }
+        taskManager->value().addPeriodicTask(_priv->taskName
+            , std::bind(&SocketAppenderSkeleton::retryConnect, this)
+            , std::chrono::milliseconds(_priv->reconnectionDelay)
+            );
+    }
+    _priv->taskManager = taskManager;
 }
 
-void SocketAppenderSkeleton::monitor()
+void SocketAppenderSkeleton::retryConnect()
 {
-       Pool p;
-       SocketPtr socket;
-
-       while (!is_closed())
+       if (is_closed())
        {
+               if (auto pManager = _priv->taskManager.lock())
+                       pManager->value().removePeriodicTask(_priv->taskName);
+       }
+       else
+       {
+               Pool p;
+               SocketPtr socket;
                try
                {
                        if (LogLog::isDebugEnabled())
@@ -166,8 +192,14 @@ void SocketAppenderSkeleton::monitor()
                        setSocket(socket, p);
                        if (LogLog::isDebugEnabled())
                        {
-                               LogLog::debug(LOG4CXX_STR("Connection 
established. Exiting connector thread."));
+                               LogString msg(LOG4CXX_STR("Connection 
established to [")
+                                       + _priv->address->toString() + 
LOG4CXX_STR(":"));
+                               StringHelper::toString(_priv->port, p, msg);
+                               msg += LOG4CXX_STR("].");
+                               LogLog::debug(msg);
                        }
+                       if (auto pManager = _priv->taskManager.lock())
+                               
pManager->value().removePeriodicTask(_priv->taskName);
                        return;
                }
                catch (ConnectException& e)
@@ -197,26 +229,17 @@ void SocketAppenderSkeleton::monitor()
                                msg += LOG4CXX_STR("].");
                                LogLog::debug(msg);
                        }
-
-                       std::unique_lock<std::mutex> lock( 
_priv->interrupt_mutex );
-                       if (_priv->interrupt.wait_for( lock, 
std::chrono::milliseconds( _priv->reconnectionDelay ),
-                               std::bind(&SocketAppenderSkeleton::is_closed, 
this) ))
-                               break;
                }
        }
 }
 
 void SocketAppenderSkeleton::SocketAppenderSkeletonPriv::stopMonitor()
 {
-       {
-               std::lock_guard<std::mutex> lock(this->interrupt_mutex);
-               if (this->closed)
-                       return;
-               this->closed = true;
-       }
-       this->interrupt.notify_all();
-       if (this->thread.joinable())
-               this->thread.join();
+       this->closed = true;
+       if (this->taskName.empty())
+               ;
+       else if (auto pManager = this->taskManager.lock())
+               pManager->value().removePeriodicTask(this->taskName);
 }
 
 bool SocketAppenderSkeleton::is_closed()
@@ -258,6 +281,17 @@ bool SocketAppenderSkeleton::getLocationInfo() const
 void SocketAppenderSkeleton::setReconnectionDelay(int reconnectionDelay1)
 {
        _priv->reconnectionDelay = reconnectionDelay1;
+       if (_priv->taskName.empty())
+               return;
+       auto pManager = _priv->taskManager.lock();
+       if (pManager && pManager->value().hasPeriodicTask(_priv->taskName))
+       {
+               pManager->value().removePeriodicTask(_priv->taskName);
+               pManager->value().addPeriodicTask(_priv->taskName
+                       , std::bind(&SocketAppenderSkeleton::retryConnect, this)
+                       , std::chrono::milliseconds(_priv->reconnectionDelay)
+                       );
+       }
 }
 
 int SocketAppenderSkeleton::getReconnectionDelay() const
diff --git a/src/main/cpp/threadutility.cpp b/src/main/cpp/threadutility.cpp
index 72e87248..9175f177 100644
--- a/src/main/cpp/threadutility.cpp
+++ b/src/main/cpp/threadutility.cpp
@@ -69,16 +69,17 @@ struct ThreadUtility::priv_data
                LogString             name;
                Period                delay;
                TimePoint             nextRun;
-               int                   errorCount;
                std::function<void()> f;
+               int                   errorCount{ 0 };
+               bool                  removed{ false };
        };
        using JobStore = std::list<NamedPeriodicFunction>;
        JobStore                  jobs;
-       std::mutex                job_mutex;
+       std::recursive_mutex      job_mutex;
        std::thread               thread;
        std::condition_variable   interrupt;
        std::mutex                interrupt_mutex;
-       bool                      terminated{false};
+       bool                      terminated{ false };
        int                       retryCount{ 2 };
        Period                    maxDelay{ 0 };
 
@@ -264,11 +265,11 @@ ThreadStartPost ThreadUtility::postStartFunction()
  */
 void ThreadUtility::addPeriodicTask(const LogString& name, 
std::function<void()> f, const Period& delay)
 {
-       std::lock_guard<std::mutex> lock(m_priv->job_mutex);
+       std::lock_guard<std::recursive_mutex> lock(m_priv->job_mutex);
        if (m_priv->maxDelay < delay)
                m_priv->maxDelay = delay;
        auto currentTime = std::chrono::system_clock::now();
-       m_priv->jobs.push_back( priv_data::NamedPeriodicFunction{name, delay, 
currentTime + delay, 0, f} );
+       m_priv->jobs.push_back( priv_data::NamedPeriodicFunction{name, delay, 
currentTime + delay, f} );
        if (!m_priv->thread.joinable())
        {
                m_priv->terminated = false;
@@ -283,10 +284,10 @@ void ThreadUtility::addPeriodicTask(const LogString& 
name, std::function<void()>
  */
 bool ThreadUtility::hasPeriodicTask(const LogString& name)
 {
-       std::lock_guard<std::mutex> lock(m_priv->job_mutex);
+       std::lock_guard<std::recursive_mutex> lock(m_priv->job_mutex);
        auto pItem = std::find_if(m_priv->jobs.begin(), m_priv->jobs.end()
                , [&name](const priv_data::NamedPeriodicFunction& item)
-               { return name == item.name; }
+               { return !item.removed && name == item.name; }
                );
        return m_priv->jobs.end() != pItem;
 }
@@ -297,7 +298,7 @@ bool ThreadUtility::hasPeriodicTask(const LogString& name)
 void ThreadUtility::removeAllPeriodicTasks()
 {
        {
-               std::lock_guard<std::mutex> lock(m_priv->job_mutex);
+               std::lock_guard<std::recursive_mutex> lock(m_priv->job_mutex);
                while (!m_priv->jobs.empty())
                        m_priv->jobs.pop_back();
        }
@@ -309,14 +310,14 @@ void ThreadUtility::removeAllPeriodicTasks()
  */
 void ThreadUtility::removePeriodicTask(const LogString& name)
 {
-       std::lock_guard<std::mutex> lock(m_priv->job_mutex);
+       std::lock_guard<std::recursive_mutex> lock(m_priv->job_mutex);
        auto pItem = std::find_if(m_priv->jobs.begin(), m_priv->jobs.end()
                , [&name](const priv_data::NamedPeriodicFunction& item)
-               { return name == item.name; }
+               { return !item.removed && name == item.name; }
                );
        if (m_priv->jobs.end() != pItem)
        {
-               m_priv->jobs.erase(pItem);
+               pItem->removed = true;
                m_priv->interrupt.notify_one();
        }
 }
@@ -328,14 +329,14 @@ void ThreadUtility::removePeriodicTasksMatching(const 
LogString& namePrefix)
 {
        while (1)
        {
-               std::lock_guard<std::mutex> lock(m_priv->job_mutex);
+               std::lock_guard<std::recursive_mutex> lock(m_priv->job_mutex);
                auto pItem = std::find_if(m_priv->jobs.begin(), 
m_priv->jobs.end()
                        , [&namePrefix](const priv_data::NamedPeriodicFunction& 
item)
-                       { return namePrefix.size() <= item.name.size() && 
item.name.substr(0, namePrefix.size()) == namePrefix; }
+                       { return !item.removed && namePrefix.size() <= 
item.name.size() && item.name.substr(0, namePrefix.size()) == namePrefix; }
                        );
                if (m_priv->jobs.end() == pItem)
                        break;
-               m_priv->jobs.erase(pItem);
+               pItem->removed = true;
        }
        m_priv->interrupt.notify_one();
 }
@@ -348,14 +349,16 @@ void ThreadUtility::priv_data::doPeriodicTasks()
                auto currentTime = std::chrono::system_clock::now();
                TimePoint nextOperationTime = currentTime + this->maxDelay;
                {
-                       std::lock_guard<std::mutex> lock(this->job_mutex);
+                       std::lock_guard<std::recursive_mutex> 
lock(this->job_mutex);
                        if (this->jobs.empty())
                                break;
                        for (auto& item : this->jobs)
                        {
                                if (this->terminated)
                                        return;
-                               if (item.nextRun <= currentTime)
+                               if (item.removed)
+                                       ;
+                               else if (item.nextRun <= currentTime)
                                {
                                        try
                                        {
@@ -380,13 +383,13 @@ void ThreadUtility::priv_data::doPeriodicTasks()
                                        nextOperationTime = item.nextRun;
                        }
                }
-               // Remove faulty tasks
+               // Delete removed and faulty tasks
                while (1)
                {
-                       std::lock_guard<std::mutex> lock(this->job_mutex);
+                       std::lock_guard<std::recursive_mutex> 
lock(this->job_mutex);
                        auto pItem = std::find_if(this->jobs.begin(), 
this->jobs.end()
                                , [this](const NamedPeriodicFunction& item)
-                               { return this->retryCount < item.errorCount; }
+                               { return item.removed || this->retryCount < 
item.errorCount; }
                                );
                        if (this->jobs.end() == pItem)
                                break;
diff --git a/src/main/include/log4cxx/net/socketappenderskeleton.h 
b/src/main/include/log4cxx/net/socketappenderskeleton.h
index c0edada5..0a166738 100644
--- a/src/main/include/log4cxx/net/socketappenderskeleton.h
+++ b/src/main/include/log4cxx/net/socketappenderskeleton.h
@@ -164,7 +164,7 @@ class LOG4CXX_EXPORT SocketAppenderSkeleton : public 
AppenderSkeleton
                     connection is droppped.
                     */
 
-               void monitor();
+               void retryConnect();
                bool is_closed();
                SocketAppenderSkeleton(const SocketAppenderSkeleton&);
                SocketAppenderSkeleton& operator=(const 
SocketAppenderSkeleton&);
diff --git a/src/main/include/log4cxx/private/aprsocket.h 
b/src/main/include/log4cxx/private/aprsocket.h
index 4b6e6dd7..9c566af6 100644
--- a/src/main/include/log4cxx/private/aprsocket.h
+++ b/src/main/include/log4cxx/private/aprsocket.h
@@ -41,6 +41,8 @@ class LOG4CXX_EXPORT APRSocket : public helpers::Socket
                /** Closes this socket. */
                virtual void close();
 
+               apr_socket_t* getSocketPtr() const;
+
        private:
                struct APRSocketPriv;
 };
diff --git a/src/main/include/log4cxx/private/socketappenderskeleton_priv.h 
b/src/main/include/log4cxx/private/socketappenderskeleton_priv.h
index 64ef6c35..9ed67f41 100644
--- a/src/main/include/log4cxx/private/socketappenderskeleton_priv.h
+++ b/src/main/include/log4cxx/private/socketappenderskeleton_priv.h
@@ -20,10 +20,7 @@
 #include <log4cxx/net/socketappenderskeleton.h>
 #include <log4cxx/private/appenderskeleton_priv.h>
 #include <log4cxx/helpers/inetaddress.h>
-
-#if LOG4CXX_EVENTS_AT_EXIT
-#include <log4cxx/private/atexitregistry.h>
-#endif
+#include <log4cxx/helpers/threadutility.h>
 
 namespace LOG4CXX_NS
 {
@@ -39,9 +36,6 @@ struct SocketAppenderSkeleton::SocketAppenderSkeletonPriv : 
public AppenderSkele
                port(defaultPort),
                reconnectionDelay(reconnectionDelay),
                locationInfo(false)
-#if LOG4CXX_EVENTS_AT_EXIT
-               , atExitRegistryRaii([this]{stopMonitor();})
-#endif
         { }
 
        SocketAppenderSkeletonPriv(helpers::InetAddressPtr address, int 
defaultPort, int reconnectionDelay) :
@@ -51,9 +45,6 @@ struct SocketAppenderSkeleton::SocketAppenderSkeletonPriv : 
public AppenderSkele
                port(defaultPort),
                reconnectionDelay(reconnectionDelay),
                locationInfo(false)
-#if LOG4CXX_EVENTS_AT_EXIT
-               , atExitRegistryRaii([this]{stopMonitor();})
-#endif
         { }
 
        SocketAppenderSkeletonPriv(const LogString& host, int port, int delay) :
@@ -63,9 +54,6 @@ struct SocketAppenderSkeleton::SocketAppenderSkeletonPriv : 
public AppenderSkele
                port(port),
                reconnectionDelay(delay),
                locationInfo(false)
-#if LOG4CXX_EVENTS_AT_EXIT
-               , atExitRegistryRaii([this]{stopMonitor();})
-#endif
        { }
 
        ~SocketAppenderSkeletonPriv()
@@ -84,15 +72,22 @@ struct SocketAppenderSkeleton::SocketAppenderSkeletonPriv : 
public AppenderSkele
        int port;
        int reconnectionDelay;
        bool locationInfo;
+#if LOG4CXX_ABI_VERSION <= 15
        std::thread thread;
        std::condition_variable interrupt;
        std::mutex interrupt_mutex;
-
-#if LOG4CXX_EVENTS_AT_EXIT
-       helpers::AtExitRegistry::Raii atExitRegistryRaii;
 #endif
-
        void stopMonitor();
+
+       /**
+       Manages asynchronous reconnection attempts.
+       */
+       helpers::ThreadUtility::ManagerWeakPtr taskManager;
+
+       /**
+       The reconnection task name.
+       */
+       LogString taskName;
 };
 
 } // namespace net
diff --git a/src/test/cpp/net/CMakeLists.txt b/src/test/cpp/net/CMakeLists.txt
index 555a0cf9..7d43f790 100644
--- a/src/test/cpp/net/CMakeLists.txt
+++ b/src/test/cpp/net/CMakeLists.txt
@@ -20,6 +20,7 @@ if(LOG4CXX_NETWORKING_SUPPORT)
     set(NET_TESTS
        syslogappendertestcase
        telnetappendertestcase
+       socketappendertestcase
        xmlsocketappendertestcase
     )
 else()
diff --git a/src/test/cpp/net/socketappendertestcase.cpp 
b/src/test/cpp/net/socketappendertestcase.cpp
index 2271930f..14e2674b 100644
--- a/src/test/cpp/net/socketappendertestcase.cpp
+++ b/src/test/cpp/net/socketappendertestcase.cpp
@@ -16,10 +16,20 @@
  */
 
 #include "../appenderskeletontestcase.h"
-#include "apr.h"
+#include <log4cxx/patternlayout.h>
+#include <log4cxx/basicconfigurator.h>
+#include <log4cxx/net/xmlsocketappender.h>
+#include <log4cxx/helpers/serversocket.h>
+#include <log4cxx/helpers/loglog.h>
+#include <log4cxx/helpers/stringhelper.h>
+#include <log4cxx/private/aprsocket.h>
+#include <apr_network_io.h>
 
-using namespace log4cxx;
-using namespace log4cxx::helpers;
+namespace LOG4CXX_NS { namespace net {
+       using SocketAppender = XMLSocketAppender;
+} }
+
+using namespace LOG4CXX_NS;
 
 /**
    Unit tests of log4cxx::SocketAppender
@@ -32,61 +42,111 @@ class SocketAppenderTestCase : public 
AppenderSkeletonTestCase
                //
                LOGUNIT_TEST(testDefaultThreshold);
                LOGUNIT_TEST(testSetOptionThreshold);
-               LOGUNIT_TEST(testInvalidHost);
+               LOGUNIT_TEST(testRetryConnect);
 
                LOGUNIT_TEST_SUITE_END();
 
 
        public:
 
-               void setUp()
-               {
-               }
-
-               void tearDown()
-               {
-                       BasicConfigurator::resetConfiguration();
-               }
-
                AppenderSkeleton* createAppenderSkeleton() const
                {
                        return new log4cxx::net::SocketAppender();
                }
 
-               void testInvalidHost(){
-//                     log4cxx::net::SocketAppenderPtr appender = 
std::make_shared<log4cxx::net::SocketAppender>();
-//                     log4cxx::PatternLayoutPtr layout = 
std::make_shared<log4cxx::PatternLayout>(LOG4CXX_STR("%m%n"));
-
-//                     log4cxx::helpers::ServerSocket serverSocket(4445);
-
-//                     appender->setLayout(layout);
-//                     appender->setRemoteHost(LOG4CXX_STR("localhost"));
-//                     appender->setReconnectionDelay(1);
-//                     appender->setPort(4445);
-//                     log4cxx::helpers::Pool pool;
-//                     appender->activateOptions(pool);
-
-//                     BasicConfigurator::configure(appender);
-
-//                     
log4cxx::Logger::getRootLogger()->setLevel(log4cxx::Level::getAll());
-
-//                     std::thread th1( [](){
-//                             for( int x = 0; x < 3000; x++ ){
-//                                     
LOG4CXX_INFO(Logger::getLogger(LOG4CXX_STR("test")), "Some message" );
-//                             }
-//                     });
-//                     std::thread th2( [](){
-//                             for( int x = 0; x < 3000; x++ ){
-//                                     
LOG4CXX_INFO(Logger::getLogger(LOG4CXX_STR("test")), "Some message" );
-//                             }
-//                     });
-
-//                     SocketPtr incomingSocket = serverSocket.accept();
-//                     incomingSocket->close();
-
-//                     // If we do not get here, we have deadlocked
-//                     th1.join();
-//                     th2.join();
+               void testRetryConnect()
+               {
+                       int tcpPort = 44445;
+                       auto appender = std::make_shared<net::SocketAppender>();
+                       
appender->setLayout(std::make_shared<log4cxx::PatternLayout>(LOG4CXX_STR("%d 
[%T] %m%n")));
+                       appender->setRemoteHost(LOG4CXX_STR("localhost"));
+                       appender->setReconnectionDelay(100); // milliseconds
+                       appender->setPort(tcpPort);
+                       helpers::Pool pool;
+                       appender->activateOptions(pool);
+
+                       BasicConfigurator::configure(appender);
+                       auto logger = Logger::getLogger("test");
+                       int logEventCount = 3000;
+                       auto doLogging = [logger, logEventCount]()
+                       {
+                               apr_sleep(50000);    // 50 millisecond
+                               for( int x = 0; x < logEventCount; x++ ){
+                                       LOG4CXX_INFO(logger, "Message " << x);
+                               }
+                       };
+                       std::vector<std::thread> loggingThread;
+                       for (auto i: {0, 1})
+                               loggingThread.emplace_back(doLogging);
+
+                       auto serverSocket = 
helpers::ServerSocket::create(tcpPort);
+                       serverSocket->setSoTimeout(1000); // milliseconds
+                       helpers::SocketPtr incomingSocket;
+                       try
+                       {
+                               incomingSocket = serverSocket->accept();
+                       }
+                       catch (std::exception& )
+                       {
+                               LOGUNIT_FAIL("accept failed");
+                       }
+                       auto aprSocket = 
dynamic_pointer_cast<helpers::APRSocket>(incomingSocket);
+                       LOGUNIT_ASSERT(aprSocket);
+                       auto pSocket = aprSocket->getSocketPtr();
+                       LOGUNIT_ASSERT(pSocket);
+                       apr_socket_timeout_set(pSocket, 100000);    // 100 
millisecond
+                       std::vector<int> messageCount;
+                       char buffer[8*1024];
+                       apr_size_t len = sizeof(buffer);
+                       while (APR_SUCCESS == apr_socket_recv(pSocket, buffer, 
&len))
+                       {
+                               auto pStart = &buffer[0];
+                               auto pEnd = buffer + len;
+                               for (auto pChar = pStart; pChar < pEnd; ++pChar)
+                               {
+                                       if ('\n' == *pChar)
+                                       {
+                                               std::string line(pStart, pChar);
+                                               auto pos = line.rfind(' ');
+                                               if (line.npos != pos && pos + 1 
< line.size())
+                                               {
+                                                       try
+                                                       {
+                                                               auto msgNumber 
= std::stoi(line.substr(pos));
+                                                               if 
(messageCount.size() <= msgNumber)
+                                                                       
messageCount.resize(msgNumber + 1);
+                                                               
++messageCount[msgNumber];
+                                                       }
+                                                       catch (std::exception 
const& ex)
+                                                       {
+                                                               LogString msg;
+                                                               
helpers::Transcoder::decode(ex.what(), msg);
+                                                               msg += 
LOG4CXX_STR(" processing\n");
+                                                               
helpers::Transcoder::decode(line, msg);
+                                                               
helpers::LogLog::warn(msg);
+                                                       }
+                                               }
+                                               pStart = pChar + 1;
+                                       }
+                               }
+                               len = sizeof(buffer);
+                       }
+                       incomingSocket->close();
+                       for (auto& t : loggingThread)
+                               t.join();
+
+                       if (helpers::LogLog::isDebugEnabled())
+                       {
+                               helpers::Pool p;
+                               LogString msg(LOG4CXX_STR("messageCount "));
+                               for (auto item : messageCount)
+                               {
+                                       msg += logchar(' ');
+                                       helpers::StringHelper::toString(item, 
p, msg);
+                               }
+                               helpers::LogLog::debug(msg);
+                       }
+                       LOGUNIT_ASSERT_EQUAL(logEventCount, 
(int)messageCount.size());
                }
 };
 

Reply via email to