This is an automated email from the git hooks/post-receive script. It was generated because a ref change was pushed to the repository containing the project "CMake".
The branch, master has been updated via 1ddce8fd6de1a88af6e81f6cfa36b48b7948a53d (commit) via 56890ede2a6eed4db074e3fe6c56e5d03dc42b6e (commit) via 9794b72d38d4aadef352d3ae80d7dee2fbfcb7fb (commit) from 4ccf40e61e02cffb28b549a3de6f4794ea3e3d92 (commit) Those revisions listed above that are new to this repository have not appeared on any other notification email; so we list those revisions in full, below. - Log ----------------------------------------------------------------- https://cmake.org/gitweb?p=cmake.git;a=commitdiff;h=1ddce8fd6de1a88af6e81f6cfa36b48b7948a53d commit 1ddce8fd6de1a88af6e81f6cfa36b48b7948a53d Merge: 4ccf40e 56890ed Author: Kyle Edwards <kyle.edwa...@kitware.com> AuthorDate: Thu Apr 25 19:33:50 2019 +0000 Commit: Kitware Robot <kwro...@kitware.com> CommitDate: Thu Apr 25 15:34:12 2019 -0400 Merge topic 'cmWorkerPool_Tweaks' 56890ede2a cmWorkerPool: Factor our worker thread class (internals) 9794b72d38 cmWorkerPool: Set worker thread count separately to Process() Acked-by: Kitware Robot <kwro...@kitware.com> Merge-request: !3260 https://cmake.org/gitweb?p=cmake.git;a=commitdiff;h=56890ede2a6eed4db074e3fe6c56e5d03dc42b6e commit 56890ede2a6eed4db074e3fe6c56e5d03dc42b6e Author: Sebastian Holtermann <sebh...@xwmw.org> AuthorDate: Wed Apr 24 11:54:56 2019 +0200 Commit: Sebastian Holtermann <sebh...@xwmw.org> CommitDate: Wed Apr 24 12:54:19 2019 +0200 cmWorkerPool: Factor our worker thread class (internals) This moves the `cmWorkerPoolInternal::WorkerT` class to `cmWorkerPoolWorker` and changes the thread start interface to make it independent of the `cmWorkerPoolInternal` type. diff --git a/Source/cmWorkerPool.cxx b/Source/cmWorkerPool.cxx index 75ca47a..cbf070e 100644 --- a/Source/cmWorkerPool.cxx +++ b/Source/cmWorkerPool.cxx @@ -371,138 +371,62 @@ void cmUVReadOnlyProcess::UVTryFinish() } /** - * @brief Private worker pool internals + * @brief Worker pool worker thread */ -class cmWorkerPoolInternal +class cmWorkerPoolWorker { public: - // -- Types - - /** - * @brief Worker thread - */ - class WorkerT - { - public: - WorkerT(unsigned int index); - ~WorkerT(); - - WorkerT(WorkerT const&) = delete; - WorkerT& operator=(WorkerT const&) = delete; - - /** - * Start the thread - */ - void Start(cmWorkerPoolInternal* internal); - - /** - * @brief Run an external process - */ - bool RunProcess(cmWorkerPool::ProcessResultT& result, - std::vector<std::string> const& command, - std::string const& workingDirectory); - - // -- Accessors - unsigned int Index() const { return Index_; } - cmWorkerPool::JobHandleT& JobHandle() { return JobHandle_; } - - private: - // -- Libuv callbacks - static void UVProcessStart(uv_async_t* handle); - void UVProcessFinished(); - - private: - //! @brief Job handle - cmWorkerPool::JobHandleT JobHandle_; - //! @brief Worker index - unsigned int Index_; - // -- Process management - struct - { - std::mutex Mutex; - cm::uv_async_ptr Request; - std::condition_variable Condition; - std::unique_ptr<cmUVReadOnlyProcess> ROP; - } Proc_; - // -- System thread - std::thread Thread_; - }; - -public: - // -- Constructors - cmWorkerPoolInternal(cmWorkerPool* pool); - ~cmWorkerPoolInternal(); + cmWorkerPoolWorker(uv_loop_t& uvLoop); + ~cmWorkerPoolWorker(); - /** - * @brief Runs the libuv loop - */ - bool Process(); - - /** - * @brief Clear queue and abort threads - */ - void Abort(); + cmWorkerPoolWorker(cmWorkerPoolWorker const&) = delete; + cmWorkerPoolWorker& operator=(cmWorkerPoolWorker const&) = delete; /** - * @brief Push a job to the queue and notify a worker + * Set the internal thread */ - bool PushJob(cmWorkerPool::JobHandleT&& jobHandle); + void SetThread(std::thread&& aThread) { Thread_ = std::move(aThread); } /** - * @brief Worker thread main loop method + * Run an external process */ - void Work(WorkerT* worker); + bool RunProcess(cmWorkerPool::ProcessResultT& result, + std::vector<std::string> const& command, + std::string const& workingDirectory); - // -- Request slots - static void UVSlotBegin(uv_async_t* handle); - static void UVSlotEnd(uv_async_t* handle); - -public: - // -- UV loop -#ifdef CMAKE_UV_SIGNAL_HACK - std::unique_ptr<cmUVSignalHackRAII> UVHackRAII; -#endif - std::unique_ptr<uv_loop_t> UVLoop; - cm::uv_async_ptr UVRequestBegin; - cm::uv_async_ptr UVRequestEnd; - - // -- Thread pool and job queue - std::mutex Mutex; - bool Processing = false; - bool Aborting = false; - bool FenceProcessing = false; - unsigned int WorkersRunning = 0; - unsigned int WorkersIdle = 0; - unsigned int JobsProcessing = 0; - std::deque<cmWorkerPool::JobHandleT> Queue; - std::condition_variable Condition; - std::vector<std::unique_ptr<WorkerT>> Workers; +private: + // -- Libuv callbacks + static void UVProcessStart(uv_async_t* handle); + void UVProcessFinished(); - // -- References - cmWorkerPool* Pool = nullptr; +private: + // -- Process management + struct + { + std::mutex Mutex; + cm::uv_async_ptr Request; + std::condition_variable Condition; + std::unique_ptr<cmUVReadOnlyProcess> ROP; + } Proc_; + // -- System thread + std::thread Thread_; }; -cmWorkerPoolInternal::WorkerT::WorkerT(unsigned int index) - : Index_(index) +cmWorkerPoolWorker::cmWorkerPoolWorker(uv_loop_t& uvLoop) { + Proc_.Request.init(uvLoop, &cmWorkerPoolWorker::UVProcessStart, this); } -cmWorkerPoolInternal::WorkerT::~WorkerT() +cmWorkerPoolWorker::~cmWorkerPoolWorker() { if (Thread_.joinable()) { Thread_.join(); } } -void cmWorkerPoolInternal::WorkerT::Start(cmWorkerPoolInternal* internal) -{ - Proc_.Request.init(*(internal->UVLoop), &WorkerT::UVProcessStart, this); - Thread_ = std::thread(&cmWorkerPoolInternal::Work, internal, this); -} - -bool cmWorkerPoolInternal::WorkerT::RunProcess( - cmWorkerPool::ProcessResultT& result, - std::vector<std::string> const& command, std::string const& workingDirectory) +bool cmWorkerPoolWorker::RunProcess(cmWorkerPool::ProcessResultT& result, + std::vector<std::string> const& command, + std::string const& workingDirectory) { if (command.empty()) { return false; @@ -525,9 +449,9 @@ bool cmWorkerPoolInternal::WorkerT::RunProcess( return !result.error(); } -void cmWorkerPoolInternal::WorkerT::UVProcessStart(uv_async_t* handle) +void cmWorkerPoolWorker::UVProcessStart(uv_async_t* handle) { - auto* wrk = reinterpret_cast<WorkerT*>(handle->data); + auto* wrk = reinterpret_cast<cmWorkerPoolWorker*>(handle->data); bool startFailed = false; { auto& Proc = wrk->Proc_; @@ -543,7 +467,7 @@ void cmWorkerPoolInternal::WorkerT::UVProcessStart(uv_async_t* handle) } } -void cmWorkerPoolInternal::WorkerT::UVProcessFinished() +void cmWorkerPoolWorker::UVProcessFinished() { { std::lock_guard<std::mutex> lock(Proc_.Mutex); @@ -555,6 +479,65 @@ void cmWorkerPoolInternal::WorkerT::UVProcessFinished() Proc_.Condition.notify_one(); } +/** + * @brief Private worker pool internals + */ +class cmWorkerPoolInternal +{ +public: + // -- Constructors + cmWorkerPoolInternal(cmWorkerPool* pool); + ~cmWorkerPoolInternal(); + + /** + * Runs the libuv loop. + */ + bool Process(); + + /** + * Clear queue and abort threads. + */ + void Abort(); + + /** + * Push a job to the queue and notify a worker. + */ + bool PushJob(cmWorkerPool::JobHandleT&& jobHandle); + + /** + * Worker thread main loop method. + */ + void Work(unsigned int workerIndex); + + // -- Request slots + static void UVSlotBegin(uv_async_t* handle); + static void UVSlotEnd(uv_async_t* handle); + +public: + // -- UV loop +#ifdef CMAKE_UV_SIGNAL_HACK + std::unique_ptr<cmUVSignalHackRAII> UVHackRAII; +#endif + std::unique_ptr<uv_loop_t> UVLoop; + cm::uv_async_ptr UVRequestBegin; + cm::uv_async_ptr UVRequestEnd; + + // -- Thread pool and job queue + std::mutex Mutex; + bool Processing = false; + bool Aborting = false; + bool FenceProcessing = false; + unsigned int WorkersRunning = 0; + unsigned int WorkersIdle = 0; + unsigned int JobsProcessing = 0; + std::deque<cmWorkerPool::JobHandleT> Queue; + std::condition_variable Condition; + std::vector<std::unique_ptr<cmWorkerPoolWorker>> Workers; + + // -- References + cmWorkerPool* Pool = nullptr; +}; + void cmWorkerPool::ProcessResultT::reset() { ExitStatus = 0; @@ -652,11 +635,13 @@ void cmWorkerPoolInternal::UVSlotBegin(uv_async_t* handle) // Create workers gint.Workers.reserve(num); for (unsigned int ii = 0; ii != num; ++ii) { - gint.Workers.emplace_back(cm::make_unique<WorkerT>(ii)); + gint.Workers.emplace_back( + cm::make_unique<cmWorkerPoolWorker>(*gint.UVLoop)); } - // Start workers - for (auto& wrk : gint.Workers) { - wrk->Start(&gint); + // Start worker threads + for (unsigned int ii = 0; ii != num; ++ii) { + gint.Workers[ii]->SetThread( + std::thread(&cmWorkerPoolInternal::Work, &gint, ii)); } } // Destroy begin request @@ -672,8 +657,9 @@ void cmWorkerPoolInternal::UVSlotEnd(uv_async_t* handle) gint.UVRequestEnd.reset(); } -void cmWorkerPoolInternal::Work(WorkerT* worker) +void cmWorkerPoolInternal::Work(unsigned int workerIndex) { + cmWorkerPool::JobHandleT jobHandle; std::unique_lock<std::mutex> uLock(Mutex); // Increment running workers count ++WorkersRunning; @@ -702,15 +688,15 @@ void cmWorkerPoolInternal::Work(WorkerT* worker) } // Pop next job from queue - worker->JobHandle() = std::move(Queue.front()); + jobHandle = std::move(Queue.front()); Queue.pop_front(); // Unlocked scope for job processing ++JobsProcessing; { uLock.unlock(); - worker->JobHandle()->Work(Pool, worker->Index()); // Process job - worker->JobHandle().reset(); // Destroy job + jobHandle->Work(Pool, workerIndex); // Process job + jobHandle.reset(); // Destroy job uLock.lock(); } --JobsProcessing; https://cmake.org/gitweb?p=cmake.git;a=commitdiff;h=9794b72d38d4aadef352d3ae80d7dee2fbfcb7fb commit 9794b72d38d4aadef352d3ae80d7dee2fbfcb7fb Author: Sebastian Holtermann <sebh...@xwmw.org> AuthorDate: Sun Apr 21 11:14:11 2019 +0200 Commit: Sebastian Holtermann <sebh...@xwmw.org> CommitDate: Wed Apr 24 12:32:58 2019 +0200 cmWorkerPool: Set worker thread count separately to Process() Don't pass the desired worker thread count to the `cmWorkerPool::Process()` method but set it separately with the new `cmWorkerPool::SetThreadCount` method. This allows calling `cmWorkerPool::Process()` repeatedly without having to pass the thread count every time. diff --git a/Source/cmQtAutoMocUic.cxx b/Source/cmQtAutoMocUic.cxx index 75c5d8a..005c27d 100644 --- a/Source/cmQtAutoMocUic.cxx +++ b/Source/cmQtAutoMocUic.cxx @@ -1186,6 +1186,7 @@ bool cmQtAutoMocUic::Init(cmMakefile* makefile) num = std::min<unsigned long>(num, ParallelMax); Base_.NumThreads = static_cast<unsigned int>(num); } + WorkerPool_.SetThreadCount(Base_.NumThreads); } // - Files and directories @@ -1482,15 +1483,12 @@ bool cmQtAutoMocUic::Process() if (!CreateDirectories()) { return false; } - - if (!WorkerPool_.Process(Base().NumThreads, this)) { + if (!WorkerPool_.Process(this)) { return false; } - if (JobError_) { return false; } - return SettingsFileWrite(); } diff --git a/Source/cmWorkerPool.cxx b/Source/cmWorkerPool.cxx index 464182c..75ca47a 100644 --- a/Source/cmWorkerPool.cxx +++ b/Source/cmWorkerPool.cxx @@ -468,6 +468,7 @@ public: // -- Thread pool and job queue std::mutex Mutex; + bool Processing = false; bool Aborting = false; bool FenceProcessing = false; unsigned int WorkersRunning = 0; @@ -591,7 +592,8 @@ cmWorkerPoolInternal::~cmWorkerPoolInternal() bool cmWorkerPoolInternal::Process() { - // Reset state + // Reset state flags + Processing = true; Aborting = false; // Initialize libuv asynchronous request UVRequestBegin.init(*UVLoop, &cmWorkerPoolInternal::UVSlotBegin, this); @@ -599,23 +601,27 @@ bool cmWorkerPoolInternal::Process() // Send begin request UVRequestBegin.send(); // Run libuv loop - return (uv_run(UVLoop.get(), UV_RUN_DEFAULT) == 0); + bool success = (uv_run(UVLoop.get(), UV_RUN_DEFAULT) == 0); + // Update state flags + Processing = false; + Aborting = false; + return success; } void cmWorkerPoolInternal::Abort() { - bool firstCall = false; + bool notifyThreads = false; // Clear all jobs and set abort flag { std::lock_guard<std::mutex> guard(Mutex); - if (!Aborting) { + if (Processing && !Aborting) { // Register abort and clear queue Aborting = true; Queue.clear(); - firstCall = true; + notifyThreads = true; } } - if (firstCall) { + if (notifyThreads) { // Wake threads Condition.notify_all(); } @@ -627,15 +633,13 @@ inline bool cmWorkerPoolInternal::PushJob(cmWorkerPool::JobHandleT&& jobHandle) if (Aborting) { return false; } - // Append the job to the queue Queue.emplace_back(std::move(jobHandle)); - // Notify an idle worker if there's one if (WorkersIdle != 0) { Condition.notify_one(); } - + // Return success return true; } @@ -743,19 +747,22 @@ cmWorkerPool::cmWorkerPool() cmWorkerPool::~cmWorkerPool() = default; -bool cmWorkerPool::Process(unsigned int threadCount, void* userData) +void cmWorkerPool::SetThreadCount(unsigned int threadCount) +{ + if (!Int_->Processing) { + ThreadCount_ = (threadCount > 0) ? threadCount : 1u; + } +} + +bool cmWorkerPool::Process(void* userData) { // Setup user data UserData_ = userData; - ThreadCount_ = (threadCount > 0) ? threadCount : 1u; - // Run libuv loop bool success = Int_->Process(); - // Clear user data UserData_ = nullptr; - ThreadCount_ = 0; - + // Return return success; } diff --git a/Source/cmWorkerPool.h b/Source/cmWorkerPool.h index 71c7d84..f08bb4f 100644 --- a/Source/cmWorkerPool.h +++ b/Source/cmWorkerPool.h @@ -50,12 +50,12 @@ public: JobT& operator=(JobT const&) = delete; /** - * @brief Virtual destructor. + * Virtual destructor. */ virtual ~JobT(); /** - * @brief Fence job flag + * Fence job flag * * Fence jobs require that: * - all jobs before in the queue have been processed @@ -66,7 +66,7 @@ public: protected: /** - * @brief Protected default constructor + * Protected default constructor */ JobT(bool fence = false) : Fence_(fence) @@ -125,12 +125,12 @@ public: }; /** - * @brief Job handle type + * Job handle type */ typedef std::unique_ptr<JobT> JobHandleT; /** - * @brief Fence job base class + * Fence job base class */ class JobFenceT : public JobT { @@ -144,8 +144,9 @@ public: }; /** - * @brief Fence job that aborts the worker pool. - * This class is useful as the last job in the job queue. + * Fence job that aborts the worker pool. + * + * Useful as the last job in the job queue. */ class JobEndT : JobFenceT { @@ -160,23 +161,29 @@ public: ~cmWorkerPool(); /** - * @brief Blocking function that starts threads to process all Jobs in - * the queue. + * Number of worker threads. + */ + unsigned int ThreadCount() const { return ThreadCount_; } + + /** + * Set the number of worker threads. * - * This method blocks until a job calls the Abort() method. - * @arg threadCount Number of threads to process jobs. - * @arg userData Common user data pointer available in all Jobs. + * Calling this method during Process() has no effect. */ - bool Process(unsigned int threadCount, void* userData = nullptr); + void SetThreadCount(unsigned int threadCount); /** - * Number of worker threads passed to Process(). - * Only valid during Process(). + * Blocking function that starts threads to process all Jobs in the queue. + * + * This method blocks until a job calls the Abort() method. + * @arg threadCount Number of threads to process jobs. + * @arg userData Common user data pointer available in all Jobs. */ - unsigned int ThreadCount() const { return ThreadCount_; } + bool Process(void* userData = nullptr); /** * User data reference passed to Process(). + * * Only valid during Process(). */ void* UserData() const { return UserData_; } @@ -184,14 +191,14 @@ public: // -- Job processing interface /** - * @brief Clears the job queue and aborts all worker threads. + * Clears the job queue and aborts all worker threads. * * This method is thread safe and can be called from inside a job. */ void Abort(); /** - * @brief Push job to the queue. + * Push job to the queue. * * This method is thread safe and can be called from inside a job or before * Process(). @@ -199,7 +206,7 @@ public: bool PushJob(JobHandleT&& jobHandle); /** - * @brief Push job to the queue + * Push job to the queue * * This method is thread safe and can be called from inside a job or before * Process(). @@ -212,7 +219,7 @@ public: private: void* UserData_ = nullptr; - unsigned int ThreadCount_ = 0; + unsigned int ThreadCount_ = 1; std::unique_ptr<cmWorkerPoolInternal> Int_; }; ----------------------------------------------------------------------- Summary of changes: Source/cmQtAutoMocUic.cxx | 6 +- Source/cmWorkerPool.cxx | 261 ++++++++++++++++++++++------------------------ Source/cmWorkerPool.h | 47 +++++---- 3 files changed, 156 insertions(+), 158 deletions(-) hooks/post-receive -- CMake _______________________________________________ Cmake-commits mailing list Cmake-commits@cmake.org https://cmake.org/mailman/listinfo/cmake-commits