[issue34410] Segfault/TimeoutError: itertools.tee of multiprocessing.pool.imap_unordered

2018-08-21 Thread Carlo Rosati


Carlo Rosati  added the comment:

If what you've said is correct, would it make the most sense to create a 
Manager method which returns a Proxy to a tee'd iterator?

--

___
Python tracker 
<https://bugs.python.org/issue34410>
___
___
Python-bugs-list mailing list
Unsubscribe: 
https://mail.python.org/mailman/options/python-bugs-list/archive%40mail-archive.com



[issue34410] Segfault/TimeoutError: itertools.tee of multiprocessing.pool.imap_unordered

2018-08-16 Thread Carlo Rosati


Carlo Rosati  added the comment:

`for i in itertools.count()` in the first implementation I posted should be 
`while True`. I was using that for debugging.

--

___
Python tracker 
<https://bugs.python.org/issue34410>
___
___
Python-bugs-list mailing list
Unsubscribe: 
https://mail.python.org/mailman/options/python-bugs-list/archive%40mail-archive.com



[issue34410] Segfault/TimeoutError: itertools.tee of multiprocessing.pool.imap_unordered

2018-08-16 Thread Carlo Rosati


Carlo Rosati  added the comment:

I've actually written a few workarounds that should be considered a 
multiprocessing specific tee function. I need feedback/critique on these. 
Hopefully we can all agree on one solution that's the best. It is unfortunate 
that the multiprocessing manager does not provide a dequeue.

The first one I wrote uses a managed list. 

def multiprocessing_tee(iterable, n=2):
"""Write a multiprocessing safe itertools.tee"""
it = iter(iterable)
m = multiprocessing.Manager()
mylock = m.Lock()
lists = [m.list() for i in range(n)]
def gen(local_list):
for i in itertools.count():
with mylock:
if not local_list: # when the local list is empty
newval = next(it)  # fetch a new value and
for l in lists:# load it to all the lists
l.append(newval)
yield local_list.pop(0)
return tuple(gen(l) for l in lists)

The second two implementations use queues. 

def multiprocessing_tee_q(iterable, n=2):
"""Write a multiprocessing safe itertools.tee"""
it = iter(iterable)
m = multiprocessing.Manager()
lock = m.Lock()
queues = [m.Queue(-1) for _ in range(n)] # -1 means infinite maxsize (so 
puts won't block)
def gen(myqueue):
while True:
with lock: # no one else touches anything
try:
newval = myqueue.get_nowait()
except Queue.Empty:
newval = next(it)
for q in queues:
q.put(newval)
newval = myqueue.get()
yield newval
return tuple(gen(q) for q in queues)

class Sentinel(object):
"""used as Queue Sentinel"""

def multiprocessing_tee_q2(iterable, n=2):
"""Write a multiprocessing safe itertools.tee"""
it = iter(iterable)
m = multiprocessing.Manager()
lock = m.Lock()
queues = [m.Queue(-1) for _ in range(n)] # -1 means infinite maxsize (so 
puts won't block)
def gen(myqueue):
while True:
try:
retval = myqueue.get_nowait()
except Queue.Empty:
# what happens if the other process puts last item in my queue 
before i get lock?
with lock: # no one else touches anything
try:
newval = next(it)
except StopIteration:
newval = Sentinel
for q in queues:
q.put(newval)
retval = myqueue.get()
if retval is Sentinel:
raise StopIteration
yield retval
return tuple(gen(q) for q in queues)

I'm just throwing out my sketches here. I'm hoping the more experienced here 
can weigh in on these implementations.

--

___
Python tracker 
<https://bugs.python.org/issue34410>
___
___
Python-bugs-list mailing list
Unsubscribe: 
https://mail.python.org/mailman/options/python-bugs-list/archive%40mail-archive.com



[issue34410] Segfault/TimeoutError: itertools.tee of multiprocessing.pool.imap_unordered

2018-08-15 Thread Carlo Rosati


Carlo Rosati  added the comment:

You'll also need to lock when modifying the manager's list. Does anyone know 
how to do this using the multiprocessing.Queues without deadlocking?

--

___
Python tracker 
<https://bugs.python.org/issue34410>
___
___
Python-bugs-list mailing list
Unsubscribe: 
https://mail.python.org/mailman/options/python-bugs-list/archive%40mail-archive.com



[issue34410] Segfault/TimeoutError: itertools.tee of multiprocessing.pool.imap_unordered

2018-08-15 Thread Carlo Rosati


Carlo Rosati  added the comment:

Okay I needed to do .pop(0) instead of .pop(-1) which is probably O(N)

--

___
Python tracker 
<https://bugs.python.org/issue34410>
___
___
Python-bugs-list mailing list
Unsubscribe: 
https://mail.python.org/mailman/options/python-bugs-list/archive%40mail-archive.com



[issue34410] Segfault/TimeoutError: itertools.tee of multiprocessing.pool.imap_unordered

2018-08-15 Thread Carlo Rosati


Carlo Rosati  added the comment:

I figured out that the problem is itertools.tee does not use a 
multiprocessing.Manager proxied object for shared state. I was able to create a 
workaround tee as follows.

def multiprocessing_tee(iterable, n=2):
"""Write a multiprocessing safe itertools.tee"""
it = iter(iterable)
m = multiprocessing.Manager()
lists = [m.list() for i in range(n)]
def gen(local_list):
keep_m_alive = m
while True:
if not local_list: # when the local list is empty
newval = next(it)  # fetch a new value and
for l in lists:# load it to all the lists
l.append(newval)
yield local_list.pop(-1)
return tuple(gen(l) for l in lists)

--

___
Python tracker 
<https://bugs.python.org/issue34410>
___
___
Python-bugs-list mailing list
Unsubscribe: 
https://mail.python.org/mailman/options/python-bugs-list/archive%40mail-archive.com



[issue34410] Segfault/TimeoutError: itertools.tee of multiprocessing.pool.imap_unordered

2018-08-14 Thread Carlo Rosati


New submission from Carlo Rosati :

Hello, 

When I run the attached code, I encounter a segmentation fault.

Thanks,
Carlo

--
files: 3.py
messages: 323546
nosy: carlorosati
priority: normal
severity: normal
status: open
title: Segfault/TimeoutError: itertools.tee of 
multiprocessing.pool.imap_unordered
type: crash
versions: Python 2.7, Python 3.7
Added file: https://bugs.python.org/file47750/3.py

___
Python tracker 
<https://bugs.python.org/issue34410>
___
___
Python-bugs-list mailing list
Unsubscribe: 
https://mail.python.org/mailman/options/python-bugs-list/archive%40mail-archive.com