Title: [188981] trunk/Source/WTF
Revision
188981
Author
ander...@apple.com
Date
2015-08-26 12:44:10 -0700 (Wed, 26 Aug 2015)

Log Message

Add and implement WorkQueue::concurrentApply
https://bugs.webkit.org/show_bug.cgi?id=148488

Reviewed by Geoffrey Garen.

WorkQueue::concurrentApply is modeled after dispatch_apply, and on Cocoa it uses dispatch_apply directly.
For other ports there's a generic concurrentApply implemented using our threading primitives.

* wtf/NeverDestroyed.h:
(WTF::LazyNeverDestroyed::operator->):
* wtf/WorkQueue.cpp:
(WTF::WorkQueue::concurrentApply):
* wtf/WorkQueue.h:
* wtf/cocoa/WorkQueueCocoa.cpp:
(WTF::WorkQueue::concurrentApply):

Modified Paths

Diff

Modified: trunk/Source/WTF/ChangeLog (188980 => 188981)


--- trunk/Source/WTF/ChangeLog	2015-08-26 19:30:50 UTC (rev 188980)
+++ trunk/Source/WTF/ChangeLog	2015-08-26 19:44:10 UTC (rev 188981)
@@ -1,3 +1,21 @@
+2015-08-26  Anders Carlsson  <ander...@apple.com>
+
+        Add and implement WorkQueue::concurrentApply
+        https://bugs.webkit.org/show_bug.cgi?id=148488
+
+        Reviewed by Geoffrey Garen.
+
+        WorkQueue::concurrentApply is modeled after dispatch_apply, and on Cocoa it uses dispatch_apply directly.
+        For other ports there's a generic concurrentApply implemented using our threading primitives.
+        
+        * wtf/NeverDestroyed.h:
+        (WTF::LazyNeverDestroyed::operator->):
+        * wtf/WorkQueue.cpp:
+        (WTF::WorkQueue::concurrentApply):
+        * wtf/WorkQueue.h:
+        * wtf/cocoa/WorkQueueCocoa.cpp:
+        (WTF::WorkQueue::concurrentApply):
+
 2015-08-25  Filip Pizlo  <fpi...@apple.com>
 
         Node::origin should be able to tell you if it's OK to exit

Modified: trunk/Source/WTF/wtf/NeverDestroyed.h (188980 => 188981)


--- trunk/Source/WTF/wtf/NeverDestroyed.h	2015-08-26 19:30:50 UTC (rev 188980)
+++ trunk/Source/WTF/wtf/NeverDestroyed.h	2015-08-26 19:44:10 UTC (rev 188981)
@@ -94,6 +94,8 @@
     operator T&() { return *asPtr(); }
     T& get() { return *asPtr(); }
 
+    T* operator->() { return asPtr(); }
+
 private:
     typedef typename std::remove_const<T>::type* PointerType;
 

Modified: trunk/Source/WTF/wtf/WorkQueue.cpp (188980 => 188981)


--- trunk/Source/WTF/wtf/WorkQueue.cpp	2015-08-26 19:30:50 UTC (rev 188980)
+++ trunk/Source/WTF/wtf/WorkQueue.cpp	2015-08-26 19:44:10 UTC (rev 188981)
@@ -26,7 +26,14 @@
 #include "config.h"
 #include "WorkQueue.h"
 
-#include "Ref.h"
+#include <mutex>
+#include <wtf/BinarySemaphore.h>
+#include <wtf/MessageQueue.h>
+#include <wtf/NeverDestroyed.h>
+#include <wtf/NumberOfCores.h>
+#include <wtf/Ref.h>
+#include <wtf/Threading.h>
+#include <wtf/text/WTFString.h>
 
 namespace WTF {
 
@@ -45,4 +52,105 @@
     platformInvalidate();
 }
 
+#if !PLATFORM(COCOA)
+void WorkQueue::concurrentApply(size_t iterations, const std::function<void (size_t index)>& function)
+{
+    if (!iterations)
+        return;
+
+    if (iterations == 1) {
+        function(0);
+        return;
+    }
+
+    class ThreadPool {
+    public:
+        ThreadPool()
+        {
+            // We don't need a thread for the current core.
+            unsigned threadCount = numberOfProcessorCores() - 1;
+
+            m_workers.reserveInitialCapacity(threadCount);
+            for (unsigned i = 0; i < threadCount; ++i) {
+                m_workers.append(createThread(String::format("ThreadPool Worker %u", i).utf8().data(), [this] {
+                    threadBody();
+                }));
+            }
+        }
+
+        size_t workerCount() const { return m_workers.size(); }
+
+        void dispatch(const std::function<void ()>* function)
+        {
+            LockHolder holder(m_lock);
+
+            m_queue.append(function);
+            m_condition.notifyOne();
+        }
+
+    private:
+        NO_RETURN void threadBody()
+        {
+            while (true) {
+                const std::function<void ()>* function;
+
+                {
+                    LockHolder holder(m_lock);
+
+                    m_condition.wait(m_lock, [this] {
+                        return !m_queue.isEmpty();
+                    });
+
+                    function = m_queue.takeFirst();
+                }
+
+                (*function)();
+            }
+        }
+
+        Lock m_lock;
+        Condition m_condition;
+        Deque<const std::function<void ()>*> m_queue;
+
+        Vector<ThreadIdentifier> m_workers;
+    };
+
+    static LazyNeverDestroyed<ThreadPool> threadPool;
+    static std::once_flag onceFlag;
+    std::call_once(onceFlag, [] {
+        threadPool.construct();
+    });
+
+    // Cap the worker count to the number of iterations (excluding this thread)
+    const size_t workerCount = std::min(iterations - 1, threadPool->workerCount());
+
+    std::atomic<size_t> currentIndex(0);
+    std::atomic<size_t> activeThreads(workerCount + 1);
+
+    Condition condition;
+    Lock lock;
+
+    std::function<void ()> applier = [&] {
+        size_t index;
+
+        // Call the function for as long as there are iterations left.
+        while ((index = currentIndex++) < iterations)
+            function(index);
+
+        // If there are no active threads left, signal the caller.
+        if (!--activeThreads) {
+            LockHolder holder(lock);
+            condition.notifyOne();
+        }
+    };
+
+    for (size_t i = 0; i < workerCount; ++i)
+        threadPool->dispatch(&applier);
+    applier();
+
+    LockHolder holder(lock);
+    condition.wait(lock, [&] { return !activeThreads; });
 }
+#endif
+
+}

Modified: trunk/Source/WTF/wtf/WorkQueue.h (188980 => 188981)


--- trunk/Source/WTF/wtf/WorkQueue.h	2015-08-26 19:30:50 UTC (rev 188980)
+++ trunk/Source/WTF/wtf/WorkQueue.h	2015-08-26 19:44:10 UTC (rev 188981)
@@ -71,6 +71,8 @@
     WTF_EXPORT_PRIVATE virtual void dispatch(std::function<void ()>) override;
     WTF_EXPORT_PRIVATE void dispatchAfter(std::chrono::nanoseconds, std::function<void ()>);
 
+    WTF_EXPORT_PRIVATE static void concurrentApply(size_t iterations, const std::function<void (size_t index)>&);
+
 #if OS(DARWIN)
     dispatch_queue_t dispatchQueue() const { return m_dispatchQueue; }
 #elif PLATFORM(GTK)

Modified: trunk/Source/WTF/wtf/cocoa/WorkQueueCocoa.cpp (188980 => 188981)


--- trunk/Source/WTF/wtf/cocoa/WorkQueueCocoa.cpp	2015-08-26 19:30:50 UTC (rev 188980)
+++ trunk/Source/WTF/wtf/cocoa/WorkQueueCocoa.cpp	2015-08-26 19:44:10 UTC (rev 188981)
@@ -102,4 +102,11 @@
     dispatch_release(m_dispatchQueue);
 }
 
+void WorkQueue::concurrentApply(size_t iterations, const std::function<void (size_t index)>& function)
+{
+    dispatch_apply(iterations, dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0), ^(size_t index) {
+        function(index);
+    });
 }
+
+}
_______________________________________________
webkit-changes mailing list
webkit-changes@lists.webkit.org
https://lists.webkit.org/mailman/listinfo/webkit-changes

Reply via email to