I made an alternative implementation using a thread pool and a queue, based 
on the example at
http://clojure.org/concurrent_programming 

In short, your pmapall and the pool-based implementation (below) both give 
approximately
perfect scaling on my 4/8-core system (Intel i7 920 and HT). 
Both give close to full load on all cores and a factor 4.4 speedup compared 
to single threaded. 
This seems about right, the CPU has four physical cores and get a few 
percent extra performance 
from the virtual cores, so the speedup is approximately linear with the 
number of cores.

pmap-pool may be a tiny bit faster than the pmapall, but they are so close 
that I can't 
really tell.

It is possible that there is some sort of synchronization overhead on your 
48-core machine.
95% of the tasks are practically noops, after all - just the cost of a 
single function call. 
There are only 48 tasks in your test that actually require computation, so 
each 
core will do a bunch of noops and perhaps one "real" task. 

In real time, a single i7 920 runs the test just as fast as your 48 cores. I 
don't expect that's
representative for what your 48 cores can do.

I suggest
* Increase the test size and/or the density of "heavy" tasks.
* Let the "light" tasks do a bit more computation, at least enough to pay 
for the 
overhead of calling them.
* Start with a smaller number of threads, and see where it stops scaling 
linearly.


Threadpool/queue-based implementation:

(import '(java.util.concurrent Executors))
(defn pmap-pool [f coll]
  (let [queue (ref coll)  ;; shared queue of work units
        nthreads  (.availableProcessors (Runtime/getRuntime))
        pool  (Executors/newFixedThreadPool nthreads)
        tasks (map (fn [_] 
                     (fn [] ; one task per thread
                       (let [local-res (atom [])] ;; collect results per 
thread to minimize synchronization
                         (while (seq @queue)
                           ;; queue may be emptied between 'while'
                           ;; and 'dosync'.
                           (when-let [wu (dosync
                                                ;; grab work unit, update 
queue
                                                (when-let [w (first @queue)]
                                                  (alter queue next)
                                                  w))]
                             (swap! local-res conj (f wu))))
                         local-res)))
                   (range nthreads))
        results (doall (map #(deref (.get %)) ;; blocks until completion
                            (.invokeAll pool tasks))) ;; start all tasks
        results (reduce concat results)]
    (.shutdown pool)
    ;; sanity check
    (when-not (and (empty? @queue)     
                   (= (count results) (count coll))
                   (every? #(= % :done) results))
      (println "ERROR: queue " (count @queue) " #results" (count results)))
    results))

Results on an i7 920, 4 cores/8 threads (hyperthreading), Ubuntu 10.10:

user=> (time (last (map fast-or-slow inputs))))
"Elapsed time: 161891.732036 msecs", 100% CPU (out of 800% possible)

user=> (time (last (pmap fast-or-slow inputs))))
"Elapsed time: 163139.249677 msecs", 100% CPU
pmap has zero effect on my system, it won't use more than one core.

user=> (time (last (pmapall fast-or-slow inputs))))    
"Elapsed time: 37710.349712 msecs", ~793% CPU 

user=> (time (last (pmap-pool fast-or-slow inputs))))
"Elapsed time: 36393.132824 msecs", ~795% CPU

-- 
You received this message because you are subscribed to the Google
Groups "Clojure" group.
To post to this group, send email to clojure@googlegroups.com
Note that posts from new members are moderated - please be patient with your 
first post.
To unsubscribe from this group, send email to
clojure+unsubscr...@googlegroups.com
For more options, visit this group at
http://groups.google.com/group/clojure?hl=en

Reply via email to