[issue34410] Segfault/TimeoutError: itertools.tee of multiprocessing.pool.imap_unordered
Carlo Rosati added the comment: If what you've said is correct, would it make the most sense to create a Manager method which returns a Proxy to a tee'd iterator? -- ___ Python tracker <https://bugs.python.org/issue34410> ___ ___ Python-bugs-list mailing list Unsubscribe: https://mail.python.org/mailman/options/python-bugs-list/archive%40mail-archive.com
[issue34410] Segfault/TimeoutError: itertools.tee of multiprocessing.pool.imap_unordered
Carlo Rosati added the comment: `for i in itertools.count()` in the first implementation I posted should be `while True`. I was using that for debugging. -- ___ Python tracker <https://bugs.python.org/issue34410> ___ ___ Python-bugs-list mailing list Unsubscribe: https://mail.python.org/mailman/options/python-bugs-list/archive%40mail-archive.com
[issue34410] Segfault/TimeoutError: itertools.tee of multiprocessing.pool.imap_unordered
Carlo Rosati added the comment: I've actually written a few workarounds that should be considered a multiprocessing specific tee function. I need feedback/critique on these. Hopefully we can all agree on one solution that's the best. It is unfortunate that the multiprocessing manager does not provide a dequeue. The first one I wrote uses a managed list. def multiprocessing_tee(iterable, n=2): """Write a multiprocessing safe itertools.tee""" it = iter(iterable) m = multiprocessing.Manager() mylock = m.Lock() lists = [m.list() for i in range(n)] def gen(local_list): for i in itertools.count(): with mylock: if not local_list: # when the local list is empty newval = next(it) # fetch a new value and for l in lists:# load it to all the lists l.append(newval) yield local_list.pop(0) return tuple(gen(l) for l in lists) The second two implementations use queues. def multiprocessing_tee_q(iterable, n=2): """Write a multiprocessing safe itertools.tee""" it = iter(iterable) m = multiprocessing.Manager() lock = m.Lock() queues = [m.Queue(-1) for _ in range(n)] # -1 means infinite maxsize (so puts won't block) def gen(myqueue): while True: with lock: # no one else touches anything try: newval = myqueue.get_nowait() except Queue.Empty: newval = next(it) for q in queues: q.put(newval) newval = myqueue.get() yield newval return tuple(gen(q) for q in queues) class Sentinel(object): """used as Queue Sentinel""" def multiprocessing_tee_q2(iterable, n=2): """Write a multiprocessing safe itertools.tee""" it = iter(iterable) m = multiprocessing.Manager() lock = m.Lock() queues = [m.Queue(-1) for _ in range(n)] # -1 means infinite maxsize (so puts won't block) def gen(myqueue): while True: try: retval = myqueue.get_nowait() except Queue.Empty: # what happens if the other process puts last item in my queue before i get lock? with lock: # no one else touches anything try: newval = next(it) except StopIteration: newval = Sentinel for q in queues: q.put(newval) retval = myqueue.get() if retval is Sentinel: raise StopIteration yield retval return tuple(gen(q) for q in queues) I'm just throwing out my sketches here. I'm hoping the more experienced here can weigh in on these implementations. -- ___ Python tracker <https://bugs.python.org/issue34410> ___ ___ Python-bugs-list mailing list Unsubscribe: https://mail.python.org/mailman/options/python-bugs-list/archive%40mail-archive.com
[issue34410] Segfault/TimeoutError: itertools.tee of multiprocessing.pool.imap_unordered
Carlo Rosati added the comment: You'll also need to lock when modifying the manager's list. Does anyone know how to do this using the multiprocessing.Queues without deadlocking? -- ___ Python tracker <https://bugs.python.org/issue34410> ___ ___ Python-bugs-list mailing list Unsubscribe: https://mail.python.org/mailman/options/python-bugs-list/archive%40mail-archive.com
[issue34410] Segfault/TimeoutError: itertools.tee of multiprocessing.pool.imap_unordered
Carlo Rosati added the comment: Okay I needed to do .pop(0) instead of .pop(-1) which is probably O(N) -- ___ Python tracker <https://bugs.python.org/issue34410> ___ ___ Python-bugs-list mailing list Unsubscribe: https://mail.python.org/mailman/options/python-bugs-list/archive%40mail-archive.com
[issue34410] Segfault/TimeoutError: itertools.tee of multiprocessing.pool.imap_unordered
Carlo Rosati added the comment: I figured out that the problem is itertools.tee does not use a multiprocessing.Manager proxied object for shared state. I was able to create a workaround tee as follows. def multiprocessing_tee(iterable, n=2): """Write a multiprocessing safe itertools.tee""" it = iter(iterable) m = multiprocessing.Manager() lists = [m.list() for i in range(n)] def gen(local_list): keep_m_alive = m while True: if not local_list: # when the local list is empty newval = next(it) # fetch a new value and for l in lists:# load it to all the lists l.append(newval) yield local_list.pop(-1) return tuple(gen(l) for l in lists) -- ___ Python tracker <https://bugs.python.org/issue34410> ___ ___ Python-bugs-list mailing list Unsubscribe: https://mail.python.org/mailman/options/python-bugs-list/archive%40mail-archive.com
[issue34410] Segfault/TimeoutError: itertools.tee of multiprocessing.pool.imap_unordered
New submission from Carlo Rosati : Hello, When I run the attached code, I encounter a segmentation fault. Thanks, Carlo -- files: 3.py messages: 323546 nosy: carlorosati priority: normal severity: normal status: open title: Segfault/TimeoutError: itertools.tee of multiprocessing.pool.imap_unordered type: crash versions: Python 2.7, Python 3.7 Added file: https://bugs.python.org/file47750/3.py ___ Python tracker <https://bugs.python.org/issue34410> ___ ___ Python-bugs-list mailing list Unsubscribe: https://mail.python.org/mailman/options/python-bugs-list/archive%40mail-archive.com