Author: Remi Meier Branch: Changeset: r240:06ac9ee33205 Date: 2014-03-25 09:33 +0100 http://bitbucket.org/pypy/benchmarks/changeset/06ac9ee33205/
Log: add some multithreaded benchmarks and a script to run them (optional) diff --git a/multithread/bench.py b/multithread/bench.py new file mode 100644 --- /dev/null +++ b/multithread/bench.py @@ -0,0 +1,116 @@ +#!/usr/bin/python + +import time +import math +import imp, os, sys +import json +import contextlib + +def import_file(filepath): + mod_name, file_ext = os.path.splitext(os.path.split(filepath)[-1]) + return imp.load_source(mod_name, filepath) + + +class DummyFile(object): + def write(self, x): pass + +@contextlib.contextmanager +def nostdout(): + save_stdout = sys.stdout + sys.stdout = DummyFile() + yield + sys.stdout = save_stdout + + +def avg(xs): + return sum(xs) / len(xs) + +def std_dev(xs): + N = len(xs) + mu = avg(xs) + var = sum([(x - mu)**2 for x in xs]) / N + return math.sqrt(var) + +def get_error(times): + ts = sorted(times)[:args.k] + best = float(ts[0]) + + return max((t / best) - 1.0 for t in ts) + +def within_error(args, times): + return get_error(times) < args.error + +def main(args): + basedir = os.path.abspath(os.path.dirname(__file__)) + sys.path.insert(0, basedir+'/') + import common + print __file__ + folder = os.path.dirname(args.file) + os.chdir(folder) + sys.path.insert(0, os.path.abspath('.')) + test = import_file(os.path.basename(args.file)) + + times = [] + k = 1 + try: + while True: + time.sleep(0.2) + if not args.q: + print "Run {}/{}:".format(k, args.k) + + test_time = time.time() + if args.p: + test.run(*args.more) + else: + with nostdout(): + test.run(*args.more) + times.append(time.time() - test_time) + + if not args.q: + print "took {} s".format(times[-1]) + + if k >= args.k: + if within_error(args, times): + break + elif not args.q: + print "error was not within", args.error + + if k > 2 * args.k: + if not args.q: + print "max number of iterations reached", \ + "error still too great, finish anyway" + break + k += 1 + finally: + if not args.q: + print "times:", times + + if times: + times = sorted(times)[:args.k] + result = {'best':min(times), + 'error':get_error(times), + 'std_dev(k)':std_dev(times)} + print json.dumps(result) + + + +if __name__ == '__main__': + import argparse + + parser = argparse.ArgumentParser() + parser.add_argument('-k', default=3, help='K-best K', type=int) + parser.add_argument('-e', '--error', default=0.05, type=float, + help='relative allowed error [0.05]') + parser.add_argument('-q', action='store_const', + const=True, default=False, + help='mute except for best run') + parser.add_argument('-p', action='store_const', + const=True, default=False, + help='print to stdout what the benchmark prints') + parser.add_argument('file', help='file to run') + parser.add_argument('more', nargs="*", help='file.run() arguments') + + args = parser.parse_args() + if not args.q: + print args + main(args) diff --git a/multithread/common/__init__.py b/multithread/common/__init__.py new file mode 100644 diff --git a/multithread/common/abstract_threading.py b/multithread/common/abstract_threading.py new file mode 100644 --- /dev/null +++ b/multithread/common/abstract_threading.py @@ -0,0 +1,119 @@ +from Queue import Queue, Empty, Full +from threading import Thread, Condition, Lock +import thread + +try: + from __pypy__.thread import atomic +except ImportError: + atomic = Lock() + +class Worker(Thread): + """Thread executing tasks from a given tasks queue""" + def __init__(self, queue): + Thread.__init__(self) + self.daemon = True + self.next_task = None + self.cond = Condition() + self.queue = queue + self.start() + + def run(self): + # the next line registers the at_commit_cb on interpreter + # level for this thread. This should be fixed in the + # interpreter (it causes a conflict in stmgcintf.register_at_commit_cb). + # thread.at_commit(lambda : 0, ()) + + while True: + with self.cond: + while self.next_task is None: + self.cond.wait() + + func, args, kargs = self.next_task + self.next_task = None + + try: + func(*args, **kargs) + except Exception as e: + print e + + # first time put in queue by threadpool on creation + try: + self.queue.put_nowait(self) + except Full: + # thread limit reached, I'll show myself out.. + return + + +class ThreadPool(object): + def __init__(self, thread_queue_size=12): + self.threads = Queue(thread_queue_size) + + def add_task(self, func, *args, **kargs): + try: + worker = self.threads.get_nowait() + except Empty: + worker = Worker(self.threads) + + with worker.cond: + worker.next_task = (func, args, kargs) + worker.cond.notify_all() + + + + +import multiprocessing +_thread_pool = ThreadPool(3 * multiprocessing.cpu_count()) + + + + +class Future(object): + def __init__(self, func, *args, **kwargs): + self._done = False + self._result = None + self._exception = None + self._cond = Condition() + + assert hasattr(func, "__call__") + + _thread_pool.add_task(self._task, func, *args, **kwargs) + + + def _task(self, func, *args, **kwargs): + with self._cond: + try: + self._result = func(*args, **kwargs) + except Exception as e: + self._exception = e + finally: + self._done = True + # several points/threads in the program + # may wait for the result (notify_all): + self._cond.notify_all() + + + def __call__(self): + with self._cond: + while not self._done: + self._cond.wait() + + if self._exception: + raise self._exception + + return self._result + + + +class AtomicFuture(Future): + def _task(self, func, *args, **kwargs): + with self._cond: + try: + with atomic: + self._result = func(*args, **kwargs) + except Exception as e: + self._exception = e + finally: + self._done = True + # several points/threads in the program + # may wait for the result (notify_all): + self._cond.notify_all() diff --git a/multithread/mandelbrot/mandelbrot.py b/multithread/mandelbrot/mandelbrot.py new file mode 100644 --- /dev/null +++ b/multithread/mandelbrot/mandelbrot.py @@ -0,0 +1,80 @@ +from common.abstract_threading import Future, atomic +import Image, sys + + +def calculate(a, b, im_size, max_iter=255): + print "a:%s, b:%s, im_size:%s" % (a, b, im_size) + ar, ai = a + br, bi = b + width, height = im_size + imag_step = (bi - ai) / (height - 1) + real_step = (br - ar) / (width - 1) + print "real/width:%s, imag/height:%s" % (real_step, imag_step) + + with atomic: + result = [[0] * width for y in xrange(height)] + for y in xrange(height): + zi = ai + y * imag_step + for x in xrange(width): + zr = ar + x * real_step + z = complex(zr, zi) + c = z + for i in xrange(max_iter): + if abs(z) > 2.0: + break + z = z * z + c + result[y][x] = i + + return result + +def save_img(image, file_name='out.png'): + im = Image.new("RGB", (len(image[0]), len(image))) + out = im.load() + + for y in xrange(len(image)): + for x in xrange(len(image[0])): + c = image[y][x] + out[x,y] = c, c, c + im.save(file_name, 'PNG') + +def save_to_file(image, file_name='out.txt'): + with atomic: + s = "\n".join(map(str, image)) + with open(file_name, 'w') as f: + f.write(s) + + +def merge_imgs(imgs): + res = [] + for img in imgs: + for y in img: + res.append(y) + return res + + +def run(threads=2): + threads = int(threads) + ar, ai = -2.0, -1.5 + br, bi = 1.0, 1.5 + width, height = 4096, 4096 + + step = (bi - ai) / threads + res = [] + ai = -1.5 + bi = ai + step + for i in xrange(threads): + res.append(Future(calculate, + a=(ar, ai + i * step), + b=(br, bi + i * step), + im_size=(width, int(height / threads)) + )) + + res = [f() for f in res] + return merge_imgs(res) + + + +if __name__ == '__main__': + image = run(int(sys.argv[1])) + save_to_file(image) + # save_img(image) don't run on STM, allocates 4000GB of memory diff --git a/multithread/raytrace/raytrace.py b/multithread/raytrace/raytrace.py new file mode 100644 --- /dev/null +++ b/multithread/raytrace/raytrace.py @@ -0,0 +1,190 @@ +# From http://www.reddit.com/r/tinycode/comments/169ri9/ray_tracer_in_140_sloc_of_python_with_picture/ +# Date: 14.03.2013 + +from math import sqrt, pow, pi +from common.abstract_threading import atomic, Future +import time + +AMBIENT = 0.1 + + + +class Vector(object): + def __init__(self,x,y,z): + self.x = x + self.y = y + self.z = z + + def dot(self, b): + return self.x*b.x + self.y*b.y + self.z*b.z + + def cross(self, b): + return (self.y*b.z-self.z*b.y, self.z*b.x-self.x*b.z, self.x*b.y-self.y*b.x) + + def magnitude(self): + return sqrt(self.x*self.x+self.y*self.y+self.z*self.z) + + def normal(self): + mag = self.magnitude() + return Vector(self.x/mag,self.y/mag,self.z/mag) + + def __add__(self, b): + return Vector(self.x + b.x, self.y+b.y, self.z+b.z) + + def __sub__(self, b): + return Vector(self.x-b.x, self.y-b.y, self.z-b.z) + + def __mul__(self, b): + #assert type(b) == float or type(b) == int + return Vector(self.x*b, self.y*b, self.z*b) + + +class Sphere(object): + def __init__(self, center, radius, color): + self.c = center + self.r = radius + self.col = color + + def intersection(self, l): + q = l.d.dot(l.o - self.c)**2 - (l.o - self.c).dot(l.o - self.c) + self.r**2 + if q < 0: + return Intersection( Vector(0,0,0), -1, Vector(0,0,0), self) + else: + d = -l.d.dot(l.o - self.c) + d1 = d - sqrt(q) + d2 = d + sqrt(q) + if 0 < d1 and ( d1 < d2 or d2 < 0): + return Intersection(l.o+l.d*d1, d1, self.normal(l.o+l.d*d1), self) + elif 0 < d2 and ( d2 < d1 or d1 < 0): + return Intersection(l.o+l.d*d2, d2, self.normal(l.o+l.d*d2), self) + else: + return Intersection( Vector(0,0,0), -1, Vector(0,0,0), self) + + def normal(self, b): + return (b - self.c).normal() + + +class Plane(object): + def __init__(self, point, normal, color): + self.n = normal + self.p = point + self.col = color + + def intersection(self, l): + d = l.d.dot(self.n) + if d == 0: + return Intersection( Vector(0,0,0), -1, Vector(0,0,0), self) + else: + d = (self.p - l.o).dot(self.n) / d + return Intersection(l.o+l.d*d, d, self.n, self) + + +class Ray(object): + def __init__(self, origin, direction): + self.o = origin + self.d = direction + + +class Intersection(object): + def __init__(self, point, distance, normal, obj): + self.p = point + self.d = distance + self.n = normal + self.obj = obj + + +def testRay(ray, objects, ignore=None): + intersect = Intersection( Vector(0,0,0), -1, Vector(0,0,0), None) + + for obj in objects: + if obj is not ignore: + currentIntersect = obj.intersection(ray) + if currentIntersect.d > 0 and intersect.d < 0: + intersect = currentIntersect + elif 0 < currentIntersect.d < intersect.d: + intersect = currentIntersect + return intersect + + +def trace(ray, objects, light, maxRecur): + if maxRecur < 0: + return (0,0,0) + intersect = testRay(ray, objects) + if intersect.d == -1: + col = Vector(AMBIENT,AMBIENT,AMBIENT) + elif intersect.n.dot(light - intersect.p) < 0: + col = intersect.obj.col * AMBIENT + else: + lightRay = Ray(intersect.p, (light-intersect.p).normal()) + if testRay(lightRay, objects, intersect.obj).d == -1: + lightIntensity = 1000.0/(4*pi*(light-intersect.p).magnitude()**2) + col = intersect.obj.col * max(intersect.n.normal().dot((light - intersect.p).normal()*lightIntensity), AMBIENT) + else: + col = intersect.obj.col * AMBIENT + return col + + + +tasks = 0 +def task(x, h, cameraPos, objs, lightSource): + # force a transaction break here (STM not yet smart enough + # to figure out that it should break here) + time.sleep(0) + + with atomic: + for y in range(h): + ray = Ray(cameraPos, + (Vector(x/50.0-5,y/50.0-5,0)-cameraPos).normal()) + trace(ray, objs, lightSource, 10) + + # force a transaction break. updating a global var should + # be done in a separate transaction: + time.sleep(0) + + global tasks + with atomic: + tasks -= 1 + time.sleep(0) + +futures = [] +def future_dispatcher(ths, *args): + global tasks + + while tasks >= ths: + time.sleep(0) + + with atomic: + tasks += 1 + + futures.append(Future(task, *args)) + time.sleep(0) + + + + +def run(ths=8, w=1024, h=1024): + ths = int(ths) + w = int(w) + h = int(h) + + objs = [] + objs.append(Sphere( Vector(-2,0,-10), 2, Vector(0,255,0))) + objs.append(Sphere( Vector(2,0,-10), 3.5, Vector(255,0,0))) + objs.append(Sphere( Vector(0,-4,-10), 3, Vector(0,0,255))) + objs.append(Plane( Vector(0,0,-12), Vector(0,0,1), Vector(255,255,255))) + lightSource = Vector(0,10,0) + + cameraPos = Vector(0,0,20) + + for x in range(w): + print x + future_dispatcher(ths, x, h, cameraPos, objs, lightSource) + + for f in futures: + f() + del futures[:] + assert tasks == 0 + + +if __name__ == '__main__': + run() _______________________________________________ pypy-commit mailing list pypy-commit@python.org https://mail.python.org/mailman/listinfo/pypy-commit