A Monday 10 January 2011 11:05:27 Francesc Alted escrigué:
> Also, I'd like to try out the new thread scheduling that you
> suggested to me privately (i.e. T0T1T0T1...  vs T0T0...T1T1...).

I've just implemented the new partition schema in numexpr 
(T0T0...T1T1..., being the original T0T1T0T1...).  I'm attaching the 
patch for this.  The results are a bit confusing.  For example, using 
the attached benchmark (poly.py), I get these results for a common dual-
core machine, non-NUMA machine:

With the T0T1...T0T1... (original) schema:

Computing: '((.25*x + .75)*x - 1.5)*x - 2' with 100000000 points
Using numpy:
*** Time elapsed: 3.497
Using numexpr:
*** Time elapsed for 1 threads: 1.279000
*** Time elapsed for 2 threads: 0.688000

With the T0T0...T1T1... (new) schema:

Computing: '((.25*x + .75)*x - 1.5)*x - 2' with 100000000 points
Using numpy:
*** Time elapsed: 3.454
Using numexpr:
*** Time elapsed for 1 threads: 1.268000
*** Time elapsed for 2 threads: 0.754000

which is around a 10% slower (2 threads) than the original partition.

The results are a bit different on a NUMA machine (8 physical cores, 16 
logical cores via hyper-threading):

With the T0T1...T0T1... (original) partition:

Computing: '((.25*x + .75)*x - 1.5)*x - 2' with 100000000 points
Using numpy:
*** Time elapsed: 3.005
Using numexpr:
*** Time elapsed for 1 threads: 1.109000
*** Time elapsed for 2 threads: 0.677000
*** Time elapsed for 3 threads: 0.496000
*** Time elapsed for 4 threads: 0.394000
*** Time elapsed for 5 threads: 0.324000
*** Time elapsed for 6 threads: 0.287000
*** Time elapsed for 7 threads: 0.247000
*** Time elapsed for 8 threads: 0.234000
*** Time elapsed for 9 threads: 0.242000
*** Time elapsed for 10 threads: 0.239000
*** Time elapsed for 11 threads: 0.241000
*** Time elapsed for 12 threads: 0.235000
*** Time elapsed for 13 threads: 0.226000
*** Time elapsed for 14 threads: 0.214000
*** Time elapsed for 15 threads: 0.235000
*** Time elapsed for 16 threads: 0.218000

With the T0T0...T1T1... (new) partition:

Computing: '((.25*x + .75)*x - 1.5)*x - 2' with 100000000 points
Using numpy:
*** Time elapsed: 3.003
Using numexpr:
*** Time elapsed for 1 threads: 1.106000
*** Time elapsed for 2 threads: 0.617000
*** Time elapsed for 3 threads: 0.442000
*** Time elapsed for 4 threads: 0.345000
*** Time elapsed for 5 threads: 0.296000
*** Time elapsed for 6 threads: 0.257000
*** Time elapsed for 7 threads: 0.237000
*** Time elapsed for 8 threads: 0.260000
*** Time elapsed for 9 threads: 0.245000
*** Time elapsed for 10 threads: 0.261000
*** Time elapsed for 11 threads: 0.238000
*** Time elapsed for 12 threads: 0.210000
*** Time elapsed for 13 threads: 0.218000
*** Time elapsed for 14 threads: 0.200000
*** Time elapsed for 15 threads: 0.235000
*** Time elapsed for 16 threads: 0.198000

In this case, the performance is similar, with perhaps a slight 
advantage for the new partition scheme, but I don't know if it is worth 
to make it the default (probably not, as this partition performs clearly 
worse on non-NUMA machines).  At any rate, both partitions perform very 
close to the aggregated memory bandwidth of NUMA machines (around 10 
GB/s in the above case).

In general, I don't think there is much point in using Intel's TBB in 
numexpr because the existing implementation already hits memory 
bandwidth limits pretty early (around 10 threads in the latter example).

-- 
Francesc Alted
Index: numexpr/interpreter.c
===================================================================
--- numexpr/interpreter.c	(revision 260)
+++ numexpr/interpreter.c	(working copy)
@@ -59,8 +59,6 @@
 int end_threads = 0;             /* should exisiting threads end? */
 pthread_t threads[MAX_THREADS];  /* opaque structure for threads */
 int tids[MAX_THREADS];           /* ID per each thread */
-intp gindex;                     /* global index for all threads */
-int init_sentinels_done;         /* sentinels initialized? */
 int giveup;                      /* should parallel code giveup? */
 int force_serial;                /* force serial code instead of parallel? */
 int pid = 0;                     /* the PID for this process */
@@ -1072,7 +1070,7 @@
     return 0;
 }
 
-/* VM engine for each threadi (general) */
+/* VM engine for each thread (general) */
 static inline int
 vm_engine_thread(char **mem, intp index, intp block_size,
                   struct vm_params params, int *pc_error)
@@ -1086,11 +1084,11 @@
 /* Do the worker job for a certain thread */
 void *th_worker(void *tids)
 {
-    /* int tid = *(int *)tids; */
-    intp index;                 /* private copy of gindex */
+    int tid = *(int *)tids;
+    intp index;
     /* Parameters for threads */
-    intp start;
-    intp vlen;
+    intp start, stop;
+    intp vlen, nblocks, th_nblocks;
     intp block_size;
     struct vm_params params;
     int *pc_error;
@@ -1103,8 +1101,6 @@
 
     while (1) {
 
-        init_sentinels_done = 0;     /* sentinels have to be initialised yet */
-
         /* Meeting point for all threads (wait for initialization) */
         pthread_mutex_lock(&count_threads_mutex);
         if (count_threads < nthreads) {
@@ -1146,27 +1142,19 @@
             pthread_mutex_unlock(&count_mutex);
         }
 
-        /* Loop over blocks */
-        pthread_mutex_lock(&count_mutex);
-        if (!init_sentinels_done) {
-            /* Set sentinels and other global variables */
-            gindex = start;
-            index = gindex;
-            init_sentinels_done = 1;    /* sentinels have been initialised */
-            giveup = 0;            /* no giveup initially */
-        } else {
-            gindex += block_size;
-            index = gindex;
+        /* Iterate over all the blocks using partition schema:
+           T0 T0 T0 ... T0 T1 T1 ... T1 */
+        nblocks = (vlen / block_size);
+        assert(vlen % block_size == 0);
+        th_nblocks = nblocks / (intp)nthreads;
+        if (nblocks % nthreads > 0) {
+            th_nblocks += 1;
         }
-        pthread_mutex_unlock(&count_mutex);
-        while (index < vlen && !giveup) {
-            if (block_size == BLOCK_SIZE1) {
-                ret = vm_engine_thread1(mem, index, params, pc_error);
-            }
-            else {
-                ret = vm_engine_thread(mem, index, block_size,
-                                       params, pc_error);
-            }
+        index = start + tid * th_nblocks * block_size;
+        stop = start + (tid+1) * th_nblocks * block_size;
+        while (index < stop) {
+            if (index >= vlen) break;
+            ret = vm_engine_thread1(mem, index, params, pc_error);
             if (ret < 0) {
                 pthread_mutex_lock(&count_mutex);
                 giveup = 1;
@@ -1174,10 +1162,8 @@
                 th_params.ret_code = ret;
                 pthread_mutex_unlock(&count_mutex);
             }
-            pthread_mutex_lock(&count_mutex);
-            gindex += block_size;
-            index = gindex;
-            pthread_mutex_unlock(&count_mutex);
+            index += block_size;
+            if (giveup) break;
         }
 
         /* Meeting point for all threads (wait for finalization) */
#######################################################################
# This script compares the speed of the computation of a polynomial
# for different (numpy and numexpr) in-memory paradigms.
#
# Author: Francesc Alted
# Date: 2010-10-01
#######################################################################

from time import time
import numpy as np
import numexpr as ne


#expr = ".25*x**3 + .75*x**2 - 1.5*x - 2"  # the polynomial to compute
expr = "((.25*x + .75)*x - 1.5)*x - 2"   # a computer-friendly polynomial
#expr = "sin(x)**2+cos(x)**2"             # a transcendental function
N = 100*1000*1000            # the number of points to compute expression
x = np.linspace(-1, 1, N)   # the x in range [-1, 1]

ne.set_num_threads(2)       # the number of threads for numexpr computations

def compute(what):
    """Compute the polynomial."""
    global expr
    if what == "numpy":
        if "sin" in expr:
            # Trick to allow numpy evaluate this
            expr = "np.sin(x)**2+np.cos(x)**2"
        y = eval(expr)
    else:
        y = ne.evaluate(expr)
#        y = ne.evaluate_iter(expr)
    return len(y)


if __name__ == '__main__':
    print "Computing: '%s' with %d points" % (expr, N)
    print "Using numpy:"
    t0 = time()
    result = compute("numpy")
    ts = round(time() - t0, 3)
    print "*** Time elapsed:", ts
    print "Using numexpr:"
    for nthreads in range(1, ne.ncores+1):
        ne.set_num_threads(nthreads)
        t0 = time()
        result = compute("numexpr")
        ts = round(time() - t0, 3)
        print "*** Time elapsed for %d threads: %f" % (nthreads, ts)
_______________________________________________
NumPy-Discussion mailing list
NumPy-Discussion@scipy.org
http://mail.scipy.org/mailman/listinfo/numpy-discussion

Reply via email to