tberghammer updated this revision to Diff 37566.
tberghammer added a comment.

Create optional std::async based implementation


http://reviews.llvm.org/D13727

Files:
  include/lldb/Utility/TaskPool.h
  source/Utility/CMakeLists.txt
  source/Utility/TaskPool.cpp
  unittests/Utility/CMakeLists.txt
  unittests/Utility/TaskPoolTest.cpp

Index: unittests/Utility/TaskPoolTest.cpp
===================================================================
--- /dev/null
+++ unittests/Utility/TaskPoolTest.cpp
@@ -0,0 +1,62 @@
+#include "gtest/gtest.h"
+
+#include "lldb/Utility/TaskPool.h"
+
+TEST (TaskPoolTest, AddTask)
+{
+    auto fn = [](int x) { return x * x + 1; };
+
+    auto f1 = TaskPool::AddTask(fn, 1);
+    auto f2 = TaskPool::AddTask(fn, 2);
+    auto f3 = TaskPool::AddTask(fn, 3);
+    auto f4 = TaskPool::AddTask(fn, 4);
+
+    ASSERT_EQ (10, f3.get());
+    ASSERT_EQ ( 2, f1.get());
+    ASSERT_EQ (17, f4.get());
+    ASSERT_EQ ( 5, f2.get());
+}
+
+TEST (TaskPoolTest, RunTasks)
+{
+    std::vector<int> r(4);
+
+    auto fn = [](int x, int& y) { y = x * x + 1; };
+    
+    TaskPool::RunTasks(
+        [fn, &r]() { fn(1, r[0]); },
+        [fn, &r]() { fn(2, r[1]); },
+        [fn, &r]() { fn(3, r[2]); },
+        [fn, &r]() { fn(4, r[3]); }
+    );
+
+    ASSERT_EQ ( 2, r[0]);
+    ASSERT_EQ ( 5, r[1]);
+    ASSERT_EQ (10, r[2]);
+    ASSERT_EQ (17, r[3]);
+}
+
+TEST (TaskPoolTest, TaskRunner)
+{
+    auto fn = [](int x) { return std::make_pair(x, x * x); };
+
+    TaskRunner<std::pair<int, int>> tr;
+    tr.AddTask(fn, 1);
+    tr.AddTask(fn, 2);
+    tr.AddTask(fn, 3);
+    tr.AddTask(fn, 4);
+
+    int count = 0;
+    while (true)
+    {
+        auto f = tr.WaitForNextCompletedTask();
+        if (!f.valid())
+            break;
+
+        ++count;
+        std::pair<int, int> v = f.get();
+        ASSERT_EQ (v.first * v.first, v.second);
+    }
+
+    ASSERT_EQ(4, count);
+}
Index: unittests/Utility/CMakeLists.txt
===================================================================
--- unittests/Utility/CMakeLists.txt
+++ unittests/Utility/CMakeLists.txt
@@ -1,4 +1,5 @@
 add_lldb_unittest(UtilityTests
   StringExtractorTest.cpp
+  TaskPoolTest.cpp
   UriParserTest.cpp
   )
Index: source/Utility/TaskPool.cpp
===================================================================
--- /dev/null
+++ source/Utility/TaskPool.cpp
@@ -0,0 +1,92 @@
+//===--------------------- TaskPool.cpp -------------------------*- C++ -*-===//
+//
+//                     The LLVM Compiler Infrastructure
+//
+// This file is distributed under the University of Illinois Open Source
+// License. See LICENSE.TXT for details.
+//
+//===----------------------------------------------------------------------===//
+
+#include "lldb/Utility/TaskPool.h"
+
+#ifdef USE_HAND_WRITTEN_THREAD_POOL
+
+namespace
+{
+    class TaskPoolImpl
+    {
+    public:
+        static TaskPoolImpl&
+        GetInstance();
+
+        void
+        AddTask(std::function<void()>&& task_fn);
+
+    private:
+        TaskPoolImpl(uint32_t num_threads);
+
+        static void
+        Worker(TaskPoolImpl* pool);
+
+        std::queue<std::function<void()>> m_tasks;
+        std::mutex                        m_tasks_mutex;
+        uint32_t                          m_thread_count;
+    };
+
+} // end of anonymous namespace
+
+TaskPoolImpl&
+TaskPoolImpl::GetInstance()
+{
+    static TaskPoolImpl g_task_pool_impl(std::thread::hardware_concurrency());
+    return g_task_pool_impl;
+}
+
+void
+TaskPool::AddTaskImpl(std::function<void()>&& task_fn)
+{
+    TaskPoolImpl::GetInstance().AddTask(std::move(task_fn));
+}
+
+TaskPoolImpl::TaskPoolImpl(uint32_t num_threads) :
+    m_thread_count(0)
+{
+}
+
+void
+TaskPoolImpl::AddTask(std::function<void()>&& task_fn)
+{
+    static const uint32_t max_threads = std::thread::hardware_concurrency();
+
+    std::unique_lock<std::mutex> lock(m_tasks_mutex);
+    m_tasks.emplace(std::move(task_fn));
+    if (m_thread_count < max_threads)
+    {
+        m_thread_count++;
+        lock.unlock();
+
+        std::thread (Worker, this).detach();
+    }
+}
+
+void
+TaskPoolImpl::Worker(TaskPoolImpl* pool)
+{
+    while (true)
+    {
+        std::unique_lock<std::mutex> lock(pool->m_tasks_mutex);
+        if (pool->m_tasks.empty())
+        {
+            pool->m_thread_count--;
+            break;
+        }
+
+        std::function<void()> f = pool->m_tasks.front();
+        pool->m_tasks.pop();
+        lock.unlock();
+
+        f();
+    }
+}
+
+#endif // USE_HAND_WRITTEN_THREAD_POOL
Index: source/Utility/CMakeLists.txt
===================================================================
--- source/Utility/CMakeLists.txt
+++ source/Utility/CMakeLists.txt
@@ -14,6 +14,7 @@
   StringExtractor.cpp
   StringExtractorGDBRemote.cpp
   StringLexer.cpp
+  TaskPool.cpp
   TimeSpecTimeout.cpp
   UriParser.cpp
   )
Index: include/lldb/Utility/TaskPool.h
===================================================================
--- /dev/null
+++ include/lldb/Utility/TaskPool.h
@@ -0,0 +1,212 @@
+//===--------------------- TaskPool.h ---------------------------*- C++ -*-===//
+//
+//                     The LLVM Compiler Infrastructure
+//
+// This file is distributed under the University of Illinois Open Source
+// License. See LICENSE.TXT for details.
+//
+//===----------------------------------------------------------------------===//
+
+#ifndef utility_TaskPool_h_
+#define utility_TaskPool_h_
+
+#include <cassert>
+#include <cstdint>
+#include <future>
+#include <list>
+#include <queue>
+#include <thread>
+#include <vector>
+
+// Define the macro USE_HAND_WRITTEN_THREAD_POOL for your platform if you want to use the thread
+// pool based implementation of TaskPool::AddTask instead of using std::async (all ofther function
+// in this file implemented using TaskPool::AddTask).
+#ifndef _WIN32
+#define USE_HAND_WRITTEN_THREAD_POOL
+#endif
+
+// Global TaskPool class for running tasks in parallel on a set of worker thread created the first
+// time the task pool is used. The TaskPool provide no gurantee about the order the task will be run
+// and about what tasks will run in parrallel. Non of the task added to the task pool should block
+// on something (mutex, future, condition variable) what will be set only by the completion of an
+// other task on the task pool as they may run on the same thread sequentally.
+class TaskPool
+{
+public:
+    // Add a new task to the thread pool and return a std::future belonging to the newly created
+    // task. The caller of this function has to wait on the future for this task to complete.
+    template<typename F, typename... Args>
+    static std::future<typename std::result_of<F(Args...)>::type>
+    AddTask(F&& f, Args&&... args);
+
+    // Run all of the specified tasks on the thread pool and wait until all of them are finished
+    // before returning. This method is intended to be used for small number tasks where listing
+    // them as function arguments is acceptable. For running large number of tasks you should use
+    // AddTask for each task and then call wait() on each returned future.
+    template<typename... T>
+    static void
+    RunTasks(T&&... tasks);
+
+private:
+    TaskPool() = delete;
+
+    template<typename... T>
+    struct RunTaskImpl;
+
+    static void
+    AddTaskImpl(std::function<void()>&& task_fn);
+};
+
+// Wrapper class around the global TaskPool implementation to make it possible to create a set of
+// tasks and then wait for the tasks to be completed by the WaitForNextCompletedTask call. This
+// class should be used when WaitForNextCompletedTask is needed because this class add no other
+// extra functionality to the TaskPool class and it have a very minor performance overhead.
+template <typename T> // The return type of the tasks what will be added to this task runner
+class TaskRunner
+{
+public:
+    // Add a task to the task runner what will also add the task to the global TaskPool. The
+    // function don't return the std::future for the task because it will be supplied by the
+    // WaitForNextCompletedTask after the task is completed.
+    template<typename F, typename... Args>
+    void
+    AddTask(F&& f, Args&&... args);
+
+    // Wait for the next task in this task runner to finish and then return the std::future what
+    // belongs to the finished task. If there is no task in this task runner (neither pending nor
+    // comleted) then this function will return an invalid future. Usually this function should be
+    // called in a loop processing the results of the tasks until it returns an invalid std::future
+    // what means that all task in this task runner is completed.
+    std::future<T>
+    WaitForNextCompletedTask();
+
+    // Convenience method to wait for all task in this TaskRunner to finish. Do NOT use this class
+    // just because of this method. Use TaskPool instead and wait for each std::future returned by
+    // AddTask in a loop.
+    void
+    WaitForAllTasks();
+
+private:
+    std::list<std::future<T>> m_ready;
+    std::list<std::future<T>> m_pending;
+    std::mutex                m_mutex;
+    std::condition_variable   m_cv;
+};
+
+template<typename F, typename... Args>
+std::future<typename std::result_of<F(Args...)>::type>
+TaskPool::AddTask(F&& f, Args&&... args)
+{
+#ifdef USE_HAND_WRITTEN_THREAD_POOL
+    auto task_sp = std::make_shared<std::packaged_task<typename std::result_of<F(Args...)>::type()>>(
+        std::bind(std::forward<F>(f), std::forward<Args>(args)...));
+
+    AddTaskImpl([task_sp]() { (*task_sp)(); });
+
+    return task_sp->get_future();
+#else // USE_HAND_WRITTEN_THREAD_POOL
+    return std::async(std::launch::async, std::forward<F>(f), std::forward<Args>(args)...);
+#endif // USE_HAND_WRITTEN_THREAD_POOL
+}
+
+template<typename... T>
+void
+TaskPool::RunTasks(T&&... tasks)
+{
+    RunTaskImpl<T...>::Run(std::forward<T>(tasks)...);
+}
+
+template<typename Head, typename... Tail>
+struct TaskPool::RunTaskImpl<Head, Tail...>
+{
+    static void
+    Run(Head&& h, Tail&&... t)
+    {
+        auto f = AddTask(std::forward<Head>(h));
+        RunTaskImpl<Tail...>::Run(std::forward<Tail>(t)...);
+        f.wait();
+    }
+};
+
+template<>
+struct TaskPool::RunTaskImpl<>
+{
+    static void
+    Run() {}
+};
+
+template <typename T>
+template<typename F, typename... Args>
+void
+TaskRunner<T>::AddTask(F&& f, Args&&... args)
+{
+    std::unique_lock<std::mutex> lock(m_mutex);
+    auto it = m_pending.emplace(m_pending.end());
+    *it = std::move(TaskPool::AddTask(
+        [this, it](F f, Args... args)
+        {
+            T&& r = f(std::forward<Args>(args)...);
+
+            std::unique_lock<std::mutex> lock(this->m_mutex);
+            this->m_ready.emplace_back(std::move(*it));
+            this->m_pending.erase(it);
+            lock.unlock();
+
+            this->m_cv.notify_one();
+            return r;
+        },
+        std::forward<F>(f),
+        std::forward<Args>(args)...));
+}
+
+template <>
+template<typename F, typename... Args>
+void
+TaskRunner<void>::AddTask(F&& f, Args&&... args)
+{
+    std::unique_lock<std::mutex> lock(m_mutex);
+    auto it = m_pending.emplace(m_pending.end());
+    *it = std::move(TaskPool::AddTask(
+        [this, it](F f, Args... args)
+        {
+            f(std::forward<Args>(args)...);
+
+            std::unique_lock<std::mutex> lock(this->m_mutex);
+            this->m_ready.emplace_back(std::move(*it));
+            this->m_pending.erase(it);
+            lock.unlock();
+
+            this->m_cv.notify_one();
+        },
+        std::forward<F>(f),
+        std::forward<Args>(args)...));
+}
+
+template <typename T>
+std::future<T>
+TaskRunner<T>::WaitForNextCompletedTask()
+{
+    std::unique_lock<std::mutex> lock(m_mutex);
+    if (m_ready.empty() && m_pending.empty())
+        return std::future<T>(); // No more tasks
+
+    if (m_ready.empty())
+        m_cv.wait(lock, [this](){ return !this->m_ready.empty(); });
+
+    std::future<T> res = std::move(m_ready.front());
+    m_ready.pop_front();
+    
+    lock.unlock();
+    res.wait();
+
+    return std::move(res);
+}
+
+template <typename T>
+void
+TaskRunner<T>::WaitForAllTasks()
+{
+    while (WaitForNextCompletedTask().valid());
+}
+
+#endif // #ifndef utility_TaskPool_h_
_______________________________________________
lldb-commits mailing list
lldb-commits@lists.llvm.org
http://lists.llvm.org/cgi-bin/mailman/listinfo/lldb-commits

Reply via email to