comphelper/source/misc/threadpool.cxx | 266 +++++++++++++----------------- desktop/source/app/app.cxx | 4 include/comphelper/threadpool.hxx | 47 +++-- package/source/zipapi/ZipOutputStream.cxx | 1 4 files changed, 152 insertions(+), 166 deletions(-)
New commits: commit aa68c99d88fd7abe08c4aee5206c859a0cdba38e Author: Michael Meeks <michael.me...@collabora.com> Date: Thu Dec 1 11:14:24 2016 +0000 tdf#104126 - comphelper thread-pool, use reliable std::condition_variable. The existing osl::Condition is an API and reliability disaster area. Change-Id: I3be84e1c6a83e58c43c40c9c8720790d923a6694 Reviewed-on: https://gerrit.libreoffice.org/31163 Tested-by: Jenkins <c...@libreoffice.org> Reviewed-by: Michael Meeks <michael.me...@collabora.com> Tested-by: Michael Meeks <michael.me...@collabora.com> diff --git a/comphelper/source/misc/threadpool.cxx b/comphelper/source/misc/threadpool.cxx index 0fda264..6329143 100644 --- a/comphelper/source/misc/threadpool.cxx +++ b/comphelper/source/misc/threadpool.cxx @@ -10,11 +10,14 @@ #include <comphelper/threadpool.hxx> #include <com/sun/star/uno/Exception.hpp> +#include <sal/config.h> #include <rtl/instance.hxx> #include <rtl/string.hxx> +#include <salhelper/thread.hxx> #include <algorithm> #include <memory> #include <thread> +#include <chrono> namespace comphelper { @@ -26,30 +29,27 @@ static thread_local bool gbIsWorkerThread; // used to group thread-tasks for waiting in waitTillDone() class COMPHELPER_DLLPUBLIC ThreadTaskTag { - osl::Mutex mMutex; - std::size_t mnTasksWorking; - osl::Condition maTasksComplete; + std::mutex maMutex; + sal_Int32 mnTasksWorking; + std::condition_variable maTasksComplete; public: ThreadTaskTag(); - bool isDone(); - void waitUntilDone(); - void onTaskWorkerDone(); - void onTaskPushed(); + bool isDone(); + void waitUntilDone(); + void onTaskWorkerDone(); + void onTaskPushed(); }; class ThreadPool::ThreadWorker : public salhelper::Thread { - ThreadPool *mpPool; - osl::Condition maNewWork; - bool mbWorking; + ThreadPool *mpPool; public: explicit ThreadWorker( ThreadPool *pPool ) : salhelper::Thread("thread-pool"), - mpPool( pPool ), - mbWorking( false ) + mpPool( pPool ) { } @@ -58,74 +58,20 @@ public: #if defined DBG_UTIL && defined LINUX gbIsWorkerThread = true; #endif - while ( ThreadTask * pTask = waitForWork() ) - { - std::shared_ptr<ThreadTaskTag> pTag(pTask->getTag()); - try { - pTask->doWork(); - } - catch (const std::exception &e) - { - SAL_WARN("comphelper", "exception in thread worker while calling doWork(): " << e.what()); - } - catch (const css::uno::Exception &e) - { - SAL_WARN("comphelper", "exception in thread worker while calling doWork(): " << e.Message); - } - delete pTask; - pTag->onTaskWorkerDone(); - } - } - - ThreadTask *waitForWork() - { - ThreadTask *pRet = nullptr; + std::unique_lock< std::mutex > aGuard( mpPool->maMutex ); - osl::ResettableMutexGuard aGuard( mpPool->maGuard ); - - pRet = mpPool->popWork(); - - while( !pRet ) + while( !mpPool->mbTerminate ) { - if (mbWorking) - mpPool->stopWork(); - mbWorking = false; - maNewWork.reset(); - - if( mpPool->mbTerminate ) - break; - - aGuard.clear(); // unlock - - maNewWork.wait(); - - aGuard.reset(); // lock + ThreadTask *pTask = mpPool->popWorkLocked( aGuard, true ); + if( pTask ) + { + aGuard.unlock(); - pRet = mpPool->popWork(); - } + pTask->execAndDelete(); - if (pRet) - { - if (!mbWorking) - mpPool->startWork(); - mbWorking = true; + aGuard.lock(); + } } - - return pRet; - } - - // Why a condition per worker thread - you may ask. - // - // Unfortunately the Windows synchronisation API that we wrap - // is horribly inadequate cf. - // http://www.cs.wustl.edu/~schmidt/win32-cv-1.html - // The existing osl::Condition API should only ever be used - // between one producer and one consumer thread to avoid the - // lost wakeup problem. - - void signalNewWork() - { - maNewWork.set(); } }; @@ -133,19 +79,18 @@ ThreadPool::ThreadPool( sal_Int32 nWorkers ) : mnThreadsWorking( 0 ), mbTerminate( false ) { + std::unique_lock< std::mutex > aGuard( maMutex ); + for( sal_Int32 i = 0; i < nWorkers; i++ ) maWorkers.push_back( new ThreadWorker( this ) ); - maTasksComplete.set(); - - osl::MutexGuard aGuard( maGuard ); for(rtl::Reference<ThreadWorker> & rpWorker : maWorkers) rpWorker->launch(); } ThreadPool::~ThreadPool() { - waitAndCleanupWorkers(); + shutdown(); } struct ThreadPoolStatic : public rtl::StaticWithInit< std::shared_ptr< ThreadPool >, @@ -183,100 +128,108 @@ sal_Int32 ThreadPool::getPreferredConcurrency() return ThreadCount; } -void ThreadPool::waitAndCleanupWorkers() +// FIXME: there should be no need for this as/when our baseline +// is >VS2015 and drop WinXP; the sorry details are here: +// https://connect.microsoft.com/VisualStudio/feedback/details/1282596 +void ThreadPool::shutdown() { - osl::ResettableMutexGuard aGuard( maGuard ); + if (mbTerminate) + return; + + std::unique_lock< std::mutex > aGuard( maMutex ); if( maWorkers.empty() ) { // no threads at all -> execute the work in-line - while ( ThreadTask * pTask = popWork() ) - { - std::shared_ptr<ThreadTaskTag> pTag(pTask->getTag()); - pTask->doWork(); - delete pTask; - pTag->onTaskWorkerDone(); - } + ThreadTask *pTask; + while ( ( pTask = popWorkLocked(aGuard, false) ) ) + pTask->execAndDelete(); } else { - aGuard.clear(); - maTasksComplete.wait(); - aGuard.reset(); + while( !maTasks.empty() ) + maTasksChanged.wait( aGuard ); } assert( maTasks.empty() ); mbTerminate = true; + maTasksChanged.notify_all(); + while( !maWorkers.empty() ) { rtl::Reference< ThreadWorker > xWorker = maWorkers.back(); maWorkers.pop_back(); assert(std::find(maWorkers.begin(), maWorkers.end(), xWorker) == maWorkers.end()); - xWorker->signalNewWork(); - aGuard.clear(); - { // unlocked + aGuard.unlock(); + { xWorker->join(); xWorker.clear(); } - aGuard.reset(); + aGuard.lock(); } } void ThreadPool::pushTask( ThreadTask *pTask ) { - osl::MutexGuard aGuard( maGuard ); + std::unique_lock< std::mutex > aGuard( maMutex ); + pTask->mpTag->onTaskPushed(); maTasks.insert( maTasks.begin(), pTask ); - // horrible beyond belief: - for(rtl::Reference<ThreadWorker> & rpWorker : maWorkers) - rpWorker->signalNewWork(); - maTasksComplete.reset(); + maTasksChanged.notify_one(); } -ThreadTask *ThreadPool::popWork() +ThreadTask *ThreadPool::popWorkLocked( std::unique_lock< std::mutex > & rGuard, bool bWait ) { - if( !maTasks.empty() ) + do { - ThreadTask *pTask = maTasks.back(); - maTasks.pop_back(); - return pTask; - } - else - return nullptr; + if( !maTasks.empty() ) + { + ThreadTask *pTask = maTasks.back(); + maTasks.pop_back(); + return pTask; + } + else if (!bWait || mbTerminate) + return nullptr; + + maTasksChanged.wait( rGuard ); + + } while (!mbTerminate); + + return nullptr; } -void ThreadPool::startWork() +void ThreadPool::startWorkLocked() { mnThreadsWorking++; } -void ThreadPool::stopWork() +void ThreadPool::stopWorkLocked() { assert( mnThreadsWorking > 0 ); if ( --mnThreadsWorking == 0 ) - maTasksComplete.set(); + maTasksChanged.notify_all(); } + void ThreadPool::waitUntilDone(const std::shared_ptr<ThreadTaskTag>& rTag) { #if defined DBG_UTIL && defined LINUX assert(!gbIsWorkerThread && "cannot wait for tasks from inside a task"); #endif - osl::ResettableMutexGuard aGuard( maGuard ); - - if( maWorkers.empty() ) - { // no threads at all -> execute the work in-line - while ( ThreadTask * pTask = popWork() ) - { - std::shared_ptr<ThreadTaskTag> pTag(pTask->getTag()); - pTask->doWork(); - delete pTask; - pTag->onTaskWorkerDone(); + { + std::unique_lock< std::mutex > aGuard( maMutex ); + + if( maWorkers.empty() ) + { // no threads at all -> execute the work in-line + ThreadTask *pTask; + while (!rTag->isDone() && + ( pTask = popWorkLocked(aGuard, false) ) ) + pTask->execAndDelete(); } } - aGuard.clear(); + rTag->waitUntilDone(); } @@ -290,54 +243,73 @@ bool ThreadPool::isTaskTagDone(const std::shared_ptr<ThreadTaskTag>& pTag) return pTag->isDone(); } - ThreadTask::ThreadTask(const std::shared_ptr<ThreadTaskTag>& pTag) : mpTag(pTag) { } +void ThreadTask::execAndDelete() +{ + std::shared_ptr<ThreadTaskTag> pTag(mpTag); + try { + doWork(); + } + catch (const std::exception &e) + { + SAL_WARN("comphelper", "exception in thread worker while calling doWork(): " << e.what()); + } + catch (const css::uno::Exception &e) + { + SAL_WARN("comphelper", "exception in thread worker while calling doWork(): " << e.Message); + } + + delete this; + pTag->onTaskWorkerDone(); +} + ThreadTaskTag::ThreadTaskTag() : mnTasksWorking(0) { - maTasksComplete.set(); } void ThreadTaskTag::onTaskPushed() { - osl::MutexGuard g(mMutex); - assert( mnTasksWorking < 65535 ); // sanity checking - ++mnTasksWorking; - maTasksComplete.reset(); + std::unique_lock< std::mutex > aGuard( maMutex ); + mnTasksWorking++; + assert( mnTasksWorking < 65536 ); // sanity checking } void ThreadTaskTag::onTaskWorkerDone() { - osl::MutexGuard g(mMutex); - assert(mnTasksWorking > 0); - --mnTasksWorking; + std::unique_lock< std::mutex > aGuard( maMutex ); + mnTasksWorking--; + assert(mnTasksWorking >= 0); if (mnTasksWorking == 0) - maTasksComplete.set(); + maTasksComplete.notify_all(); } -void ThreadTaskTag::waitUntilDone() +bool ThreadTaskTag::isDone() { -#if defined DBG_UTIL && defined LINUX - assert(!gbIsWorkerThread && "cannot wait for tasks from inside a task"); -#endif + std::unique_lock< std::mutex > aGuard( maMutex ); + return mnTasksWorking == 0; +} +void ThreadTaskTag::waitUntilDone() +{ + std::unique_lock< std::mutex > aGuard( maMutex ); + while( mnTasksWorking > 0 ) + { #ifdef DBG_UTIL - // 3 minute timeout in debug mode so our tests fail sooner rather than later - osl::Condition::Result rv = maTasksComplete.wait(TimeValue { 3*60, 0 }); - assert(rv != osl::Condition::result_timeout); + // 3 minute timeout in debug mode so our tests fail sooner rather than later + std::cv_status result = maTasksComplete.wait_for( + aGuard, std::chrono::seconds( 3 * 60 )); + assert(result != std::cv_status::timeout); #else - // 10 minute timeout in production so the app eventually throws some kind of error - if (maTasksComplete.wait(TimeValue { 10*60, 0 }) == osl::Condition::Result::result_timeout) - throw std::runtime_error("timeout waiting for threadpool tasks"); + // 10 minute timeout in production so the app eventually throws some kind of error + if (maTasksComplete.wait_for( + aGuard, std::chrono::seconds( 10 * 60 )) == std::cv_status::timeout) + throw std::runtime_error("timeout waiting for threadpool tasks"); #endif -} - -bool ThreadTaskTag::isDone() -{ - return mnTasksWorking == 0; + } } } // namespace comphelper diff --git a/desktop/source/app/app.cxx b/desktop/source/app/app.cxx index 501ebe1..9a68158 100644 --- a/desktop/source/app/app.cxx +++ b/desktop/source/app/app.cxx @@ -81,6 +81,7 @@ #include <toolkit/helper/vclunohelper.hxx> #include <comphelper/configuration.hxx> #include <comphelper/fileurl.hxx> +#include <comphelper/threadpool.hxx> #include <comphelper/processfactory.hxx> #include <comphelper/backupfilehelper.hxx> #include <unotools/bootstrap.hxx> @@ -1791,11 +1792,14 @@ int Desktop::doShutdown() StarBASIC::DetachAllDocBasicItems(); #endif } + // be sure that path/language options gets destroyed before // UCB is deinitialized pExecGlobals->pLanguageOptions.reset( nullptr ); pExecGlobals->pPathOptions.reset( nullptr ); + comphelper::ThreadPool::getSharedOptimalPool().shutdown(); + bool bRR = pExecGlobals->bRestartRequested; delete pExecGlobals; pExecGlobals = nullptr; diff --git a/include/comphelper/threadpool.hxx b/include/comphelper/threadpool.hxx index 7910a83..9f76922 100644 --- a/include/comphelper/threadpool.hxx +++ b/include/comphelper/threadpool.hxx @@ -11,11 +11,11 @@ #define INCLUDED_COMPHELPER_THREADPOOL_HXX #include <sal/config.h> -#include <salhelper/thread.hxx> -#include <osl/mutex.hxx> -#include <osl/conditn.hxx> #include <rtl/ref.hxx> #include <comphelper/comphelperdllapi.h> +#include <mutex> +#include <thread> +#include <condition_variable> #include <vector> #include <memory> @@ -28,14 +28,19 @@ class COMPHELPER_DLLPUBLIC ThreadTask { friend class ThreadPool; std::shared_ptr<ThreadTaskTag> mpTag; + + /// execute and delete this task + void execAndDelete(); +protected: + /// override to get your task performed by the pool + virtual void doWork() = 0; + /// once pushed ThreadTasks are destroyed by the pool + virtual ~ThreadTask() {} public: ThreadTask(const std::shared_ptr<ThreadTaskTag>& pTag); - virtual ~ThreadTask() {} - virtual void doWork() = 0; - const std::shared_ptr<ThreadTaskTag>& getTag() { return mpTag; } }; -/// A very basic thread pool implementation +/// A very basic thread-safe thread pool implementation class COMPHELPER_DLLPUBLIC ThreadPool final { public: @@ -50,7 +55,7 @@ public: /// returns a configurable max-concurrency /// limit to avoid spawning an unnecessarily /// large number of threads on high-core boxes. - /// MAX_CONCURRENCY envar controls the cap. + /// MAX_CONCURRENCY env. var. controls the cap. static sal_Int32 getPreferredConcurrency(); ThreadPool( sal_Int32 nWorkers ); @@ -65,6 +70,9 @@ public: /// return the number of live worker threads sal_Int32 getWorkerCount() const { return maWorkers.size(); } + /// wait until all work is completed, then join all threads + void shutdown(); + private: ThreadPool(const ThreadPool&) = delete; ThreadPool& operator=(const ThreadPool&) = delete; @@ -72,20 +80,21 @@ private: class ThreadWorker; friend class ThreadWorker; - /// wait until all work is completed, then join all threads - void waitAndCleanupWorkers(); - - ThreadTask *popWork(); - void startWork(); - void stopWork(); + /** Pop a work task + @param bWait - if set wait until task present or termination + @return a new task to perform, or NULL if list empty or terminated + */ + ThreadTask *popWorkLocked( std::unique_lock< std::mutex > & rGuard, bool bWait ); + void startWorkLocked(); + void stopWorkLocked(); - osl::Mutex maGuard; - sal_Int32 mnThreadsWorking; /// signalled when all in-progress tasks are complete - osl::Condition maTasksComplete; - bool mbTerminate; - std::vector< rtl::Reference< ThreadWorker > > maWorkers; + std::mutex maMutex; + std::condition_variable maTasksChanged; + sal_Int32 mnThreadsWorking; + bool mbTerminate; std::vector< ThreadTask * > maTasks; + std::vector< rtl::Reference< ThreadWorker > > maWorkers; }; } // namespace comphelper diff --git a/package/source/zipapi/ZipOutputStream.cxx b/package/source/zipapi/ZipOutputStream.cxx index 603a614..d0fce89 100644 --- a/package/source/zipapi/ZipOutputStream.cxx +++ b/package/source/zipapi/ZipOutputStream.cxx @@ -27,6 +27,7 @@ #include <osl/diagnose.h> #include <osl/time.h> +#include <osl/thread.hxx> #include <PackageConstants.hxx> #include <ZipEntry.hxx> _______________________________________________ Libreoffice-commits mailing list libreoffice-comm...@lists.freedesktop.org https://lists.freedesktop.org/mailman/listinfo/libreoffice-commits