Hello everyone,

This time I decided to test communication overhead in multithreaded / multiprocess communication. The results are rather disappointing, that is, communication overhead seems to be very high. In each of the following functions, I send 10,000 numbers to the function / 10 threads / 10 processes, which simply returns it in its respective way.


Function: notfun            Best: 0.00622 sec   Average: 0.00633 sec
(simple function)

Function: threadsemfun      Best: 0.64428 sec   Average: 0.64791 sec
(10 threads synchronizing using semaphore)

Function: threadlockfun     Best: 0.66288 sec   Average: 0.66453 sec
(10 threads synchronizing using locks)

Function: procqueuefun      Best: 1.16291 sec   Average: 1.17217 sec
(10 processes communicating with main process using queues)

Function: procpoolfun       Best: 1.18648 sec   Average: 1.19577 sec
(a pool of 10 processes)

If I'm doing smth wrong in the code below (smth that would result in performance suffering), please point it out.

Code:

import threading
import multiprocessing
import time
import timeit


def time_fun(fun):
t = timeit.Timer(stmt = fun, setup = "from __main__ import " + fun.__name__)
        results = t.repeat(repeat=10, number=1)
        best_result = min(results)
        avg = sum(results) / len(results)
print "Function: %-15s Best: %5.5f sec Average: %5.5f sec" % (fun.__name__, best_result, avg)


def notfun():
        inputlist = range(0,10000)
        reslist = []
        for x in range(len(inputlist)):
                reslist.append(inputlist.pop())

def threadsemfun():
        def tcalc(sem, inputlist, reslist, tid, activitylist):
                while len(inputlist) > 0:
                        sem.acquire()
                        try:
                                x = inputlist.pop()
                        except IndexError:
                                sem.release()
                                return
                        #activitylist[tid] += 1
                        reslist.append(x)
                        sem.release()
        inputlist = range(0,10000)
        #print "before: ", sum(inputlist)
        reslist = []
        tlist = []
        activitylist = [ 0 for x in range(0,10) ]
        sem = threading.Semaphore()
        for t in range(0,10):
tlist.append(threading.Thread(target=tcalc, args=(sem, inputlist, reslist, t, activitylist)))
        for t in tlist:
                t.start()
        for t in tlist:
                t.join()
        #print "after: ", sum(reslist)
        #print "thread action count:", activitylist


def threadlockfun():
        def tcalc(lock, inputlist, reslist, tid, activitylist):
                while True:
                        lock.acquire()
                        if len(inputlist) == 0:
                                lock.release()
                                return
                        x = inputlist.pop()
                        reslist.append(x)
                        #activitylist[tid] += 1
                        lock.release()
        inputlist = range(0,10000)
        #print "before: ", sum(inputlist)
        reslist = []
        tlist = []
        activitylist = [ 0 for x in range(0,10) ]
        sem = threading.Semaphore()
        for t in range(0,10):
tlist.append(threading.Thread(target=tcalc, args=(sem, inputlist, reslist, t, activitylist)))
        for t in tlist:
                t.start()
        for t in tlist:
                t.join()
        #print "after: ", sum(reslist)
        #print "thread action count:", activitylist

def pf(x):
        return x

def procpoolfun():
        pool = multiprocessing.Pool(processes=10)
        inputlist = range(0,10000)
        reslist = []
        i, j, jmax = 0, 10, len(inputlist)
        #print "before: ", sum(inputlist)
        while j <= jmax:
                res = pool.map_async(pf, inputlist[i:j])
                reslist.extend(res.get())
                i += 10
                j += 10
        #print "after: ", sum(reslist)

def procqueuefun():
        def pqf(qin, qout):
                pid = multiprocessing.current_process().pid
                while True:
                        x = qin.get()
                        if x == 'STOP':
                                return
                        qout.put((pid, x))
        qin = multiprocessing.Queue()
        qout = multiprocessing.Queue()
        plist = []
        activity = dict()
        for i in range(0,10):
                p = multiprocessing.Process(target = pqf, args=(qin, qout))
                p.start()
                plist.append(p)
                activity[p.pid] = 0
        inputlist = range(0,10000)
        reslist = []
        #print "before:", sum(inputlist)
        ilen = len(inputlist)
        x = 0
        while x != ilen:
                for i in range(0,10):
                        qin.put(inputlist[x+i])
                for i in range(0,10):
                        pid, res = qout.get()
                        #activity[pid] = activity[pid] + 1
                        reslist.append(res)
                x += 10
        for i in range(0,10):
                qin.put('STOP')
        for i in range(len(plist)):
                plist[i].join()

        #print "after:", sum(reslist)
        #print "activity", activity

if __name__ == "__main__":
        time_fun(notfun)
        time_fun(threadsemfun)
        time_fun(threadlockfun)
        time_fun(procqueuefun)
        time_fun(procpoolfun)



--
http://mail.python.org/mailman/listinfo/python-list

Reply via email to