comphelper/CppunitTest_comphelper_threadpool_test.mk |    1 
 comphelper/qa/unit/threadpooltest.cxx                |   50 +++++++++++++++++++
 comphelper/source/misc/threadpool.cxx                |   24 ++++++++-
 include/comphelper/threadpool.hxx                    |    7 +-
 4 files changed, 78 insertions(+), 4 deletions(-)

New commits:
commit 1400114a69ef4b946f66e0b9af2ab20299478a3e
Author:     Luboš Luňák <l.lu...@collabora.com>
AuthorDate: Mon Oct 5 12:14:34 2020 +0200
Commit:     Michael Meeks <michael.me...@collabora.com>
CommitDate: Tue Oct 6 12:37:17 2020 +0200

    fix allocating thread pool workers
    
    Tasks are removed from the queue before a worker starts working
    on it, which means that maTasks.size() is not the number of tasks
    to do, because the worked on tasks are not included there.
    This means the code could spawn only a smaller number of workers
    than were needed (and than CPU cores that are available).
    
    Change-Id: Ic6e6a79316cf48d82f2b80be7ad477b723b2c4e5
    Reviewed-on: https://gerrit.libreoffice.org/c/core/+/103955
    Tested-by: Jenkins
    Reviewed-by: Luboš Luňák <l.lu...@collabora.com>
    (cherry picked from commit 2ad4e77a0f266ae6e6fccaebb1d080d2880bdac3)
    Reviewed-on: https://gerrit.libreoffice.org/c/core/+/103972
    Tested-by: Jenkins CollaboraOffice <jenkinscollaboraoff...@gmail.com>
    Reviewed-by: Michael Meeks <michael.me...@collabora.com>

diff --git a/comphelper/CppunitTest_comphelper_threadpool_test.mk 
b/comphelper/CppunitTest_comphelper_threadpool_test.mk
index 16bbd6fff69b..24467c898f80 100644
--- a/comphelper/CppunitTest_comphelper_threadpool_test.mk
+++ b/comphelper/CppunitTest_comphelper_threadpool_test.mk
@@ -24,6 +24,7 @@ $(eval $(call 
gb_CppunitTest_use_libraries,comphelper_threadpool_test, \
     cppuhelper \
     cppu \
     sal \
+    tl \
 ))
 
 # vim: set noet sw=4 ts=4:
diff --git a/comphelper/qa/unit/threadpooltest.cxx 
b/comphelper/qa/unit/threadpooltest.cxx
index 8148cce941e8..141b91d03634 100644
--- a/comphelper/qa/unit/threadpooltest.cxx
+++ b/comphelper/qa/unit/threadpooltest.cxx
@@ -12,17 +12,22 @@
 #include <cppunit/TestFixture.h>
 #include <cppunit/extensions/HelperMacros.h>
 #include <cppunit/plugin/TestPlugIn.h>
+#include <tools/time.hxx>
+#include <osl/thread.hxx>
 
 #include <stdlib.h>
 #include <thread>
+#include <mutex>
 
 class ThreadPoolTest : public CppUnit::TestFixture
 {
 public:
     void testPreferredConcurrency();
+    void testWorkerUsage();
 
     CPPUNIT_TEST_SUITE(ThreadPoolTest);
     CPPUNIT_TEST(testPreferredConcurrency);
+    CPPUNIT_TEST(testWorkerUsage);
     CPPUNIT_TEST_SUITE_END();
 };
 
@@ -48,6 +53,51 @@ void ThreadPoolTest::testPreferredConcurrency() {
 #endif
 }
 
+namespace
+{
+class UsageTask : public comphelper::ThreadTask
+{
+public:
+    UsageTask(const std::shared_ptr<comphelper::ThreadTaskTag>& pTag)
+        : ThreadTask(pTag)
+    {
+    }
+    virtual void doWork()
+    {
+        ++count;
+        mutex.lock();
+        mutex.unlock();
+    }
+    static inline int count = 0;
+    static inline std::mutex mutex;
+};
+} // namespace
+
+void ThreadPoolTest::testWorkerUsage()
+{
+    // Create tasks for each available worker. Lock a shared mutex before that 
to make all
+    // tasks block on it. And check that all workers have started, i.e. that 
the full
+    // thread pool capacity is used.
+    comphelper::ThreadPool& rSharedPool = 
comphelper::ThreadPool::getSharedOptimalPool();
+    std::shared_ptr<comphelper::ThreadTaskTag> pTag = 
comphelper::ThreadPool::createThreadTaskTag();
+    UsageTask::mutex.lock();
+    for (int i = 0; i < rSharedPool.getWorkerCount(); ++i)
+    {
+        rSharedPool.pushTask(std::make_unique<UsageTask>(pTag));
+        osl::Thread::wait(std::chrono::milliseconds(10)); // give it a time to 
start
+    }
+    sal_uInt64 startTicks = tools::Time::GetSystemTicks();
+    while (UsageTask::count != rSharedPool.getWorkerCount())
+    {
+        // Wait at most 5 seconds, that should do even on slow systems.
+        CPPUNIT_ASSERT_MESSAGE("Thread pool does not use all worker threads.",
+                               startTicks + 5000 > 
tools::Time::GetSystemTicks());
+        osl::Thread::wait(std::chrono::milliseconds(10));
+    }
+    UsageTask::mutex.unlock();
+    rSharedPool.waitUntilDone(pTag);
+}
+
 CPPUNIT_TEST_SUITE_REGISTRATION(ThreadPoolTest);
 
 CPPUNIT_PLUGIN_IMPLEMENT();
diff --git a/comphelper/source/misc/threadpool.cxx 
b/comphelper/source/misc/threadpool.cxx
index 95b6f2dff091..3de17925a4e3 100644
--- a/comphelper/source/misc/threadpool.cxx
+++ b/comphelper/source/misc/threadpool.cxx
@@ -78,12 +78,14 @@ public:
             std::unique_ptr<ThreadTask> pTask = mpPool->popWorkLocked( aGuard, 
true );
             if( pTask )
             {
+                mpPool->incBusyWorker();
                 aGuard.unlock();
 
                 pTask->exec();
                 pTask.reset();
 
                 aGuard.lock();
+                mpPool->decBusyWorker();
             }
         }
     }
@@ -91,7 +93,8 @@ public:
 
 ThreadPool::ThreadPool(sal_Int32 nWorkers)
     : mbTerminate(true)
-    , mnWorkers(nWorkers)
+    , mnMaxWorkers(nWorkers)
+    , mnBusyWorkers(0)
 {
 }
 
@@ -103,6 +106,7 @@ ThreadPool::~ThreadPool()
     // still 0, but hopefully they will be more helpful on non-WNT platforms
     assert(mbTerminate);
     assert(maTasks.empty());
+    assert(mnBusyWorkers == 0);
 }
 
 struct ThreadPoolStatic : public rtl::StaticWithInit< std::shared_ptr< 
ThreadPool >,
@@ -193,7 +197,8 @@ void ThreadPool::pushTask( std::unique_ptr<ThreadTask> 
pTask )
 
     mbTerminate = false;
 
-    if (maWorkers.size() < mnWorkers && maWorkers.size() <= maTasks.size())
+    // Worked on tasks are already removed from maTasks, so include the count 
of busy workers.
+    if (maWorkers.size() < mnMaxWorkers && maWorkers.size() <= maTasks.size() 
+ mnBusyWorkers)
     {
         maWorkers.push_back( new ThreadWorker( this ) );
         maWorkers.back()->launch();
@@ -225,6 +230,17 @@ std::unique_ptr<ThreadTask> ThreadPool::popWorkLocked( 
std::unique_lock< std::mu
     return nullptr;
 }
 
+void ThreadPool::incBusyWorker()
+{
+    ++mnBusyWorkers;
+}
+
+void ThreadPool::decBusyWorker()
+{
+    assert(mnBusyWorkers >= 1);
+    --mnBusyWorkers;
+}
+
 void ThreadPool::waitUntilDone(const std::shared_ptr<ThreadTaskTag>& rTag, 
bool bJoinAll)
 {
 #if defined DBG_UTIL && (defined LINUX || defined _WIN32)
@@ -286,6 +302,10 @@ void ThreadTask::exec()
     {
         SAL_WARN("comphelper", "exception in thread worker while calling 
doWork(): " << e);
     }
+    catch (...)
+    {
+        SAL_WARN("comphelper", "unknown exception in thread worker while 
calling doWork()");
+    }
 
     pTag->onTaskWorkerDone();
 }
diff --git a/include/comphelper/threadpool.hxx 
b/include/comphelper/threadpool.hxx
index 1cb9441cfdd1..f51daf4f70a4 100644
--- a/include/comphelper/threadpool.hxx
+++ b/include/comphelper/threadpool.hxx
@@ -72,7 +72,7 @@ public:
     void        joinAll();
 
     /// return the number of live worker threads
-    sal_Int32   getWorkerCount() const { return mnWorkers; }
+    sal_Int32   getWorkerCount() const { return mnMaxWorkers; }
 
     /// wait until all work is completed, then join all threads
     void        shutdown();
@@ -90,12 +90,15 @@ private:
     */
     std::unique_ptr<ThreadTask> popWorkLocked( std::unique_lock< std::mutex > 
& rGuard, bool bWait );
     void shutdownLocked(std::unique_lock<std::mutex>&);
+    void incBusyWorker();
+    void decBusyWorker();
 
     /// signalled when all in-progress tasks are complete
     std::mutex              maMutex;
     std::condition_variable maTasksChanged;
     bool                    mbTerminate;
-    std::size_t const       mnWorkers;
+    std::size_t const       mnMaxWorkers;
+    std::size_t             mnBusyWorkers;
     std::vector< std::unique_ptr<ThreadTask> >   maTasks;
     std::vector< rtl::Reference< ThreadWorker > > maWorkers;
 };
_______________________________________________
Libreoffice-commits mailing list
libreoffice-comm...@lists.freedesktop.org
https://lists.freedesktop.org/mailman/listinfo/libreoffice-commits

Reply via email to