> On Apr 6, 2020, at 12:27 PM, David Raymond <david.raym...@tomtom.com> wrote:
> 
> Looks like this will get what you need.
> 
> 
> def some_complex_function(x):
>    global q
>    #stuff using q
> 
> def pool_init(q2):
>    global q
>    q = q2
> 
> def main():
>    #initalize the Queue
>    mp_comm_queue = mp.Queue()
> 
>    #Set up a pool to process a bunch of stuff in parallel
>    pool = mp.Pool(initializer = pool_init, initargs = (mp_comm_queue,))
>    ...
> 
> 

Gotcha, thanks. I’ll look more into that initializer argument and see how I can 
leverage it to do multiprocessing using spawn rather than fork in the future. 
Looks straight-forward enough. Thanks again!

---
Israel Brewster
Software Engineer
Alaska Volcano Observatory 
Geophysical Institute - UAF 
2156 Koyukuk Drive 
Fairbanks AK 99775-7320
Work: 907-474-5172
cell:  907-328-9145

> 
> -----Original Message-----
> From: David Raymond 
> Sent: Monday, April 6, 2020 4:19 PM
> To: python-list@python.org
> Subject: RE: Multiprocessing queue sharing and python3.8
> 
> Attempting reply as much for my own understanding.
> 
> Are you on Mac? I think this is the pertinent bit for you:
> Changed in version 3.8: On macOS, the spawn start method is now the default. 
> The fork start method should be considered unsafe as it can lead to crashes 
> of the subprocess. See bpo-33725.
> 
> When you start a new process (with the spawn method) it runs the module just 
> like it's being imported. So your global " mp_comm_queue2=mp.Queue()" creates 
> a new Queue in each process. Your initialization of mp_comm_queue is also 
> done inside the main() function, which doesn't get run in each process. So 
> each process in the Pool is going to have mp_comm_queue as None, and have its 
> own version of mp_comm_queue2. The ID being the same or different is the 
> result of one or more processes in the Pool being used repeatedly for the 
> multiple steps in imap, probably because the function that the Pool is 
> executing finishes so quickly.
> 
> Add a little extra info to the print calls (and/or set up logging to stdout 
> with the process name/id included) and you can see some of this. Here's the 
> hacked together changes I did for that.
> 
> import multiprocessing as mp
> import os
> 
> mp_comm_queue = None #Will be initalized in the main function
> mp_comm_queue2 = mp.Queue() #Test pre-initalized as well
> 
> def some_complex_function(x):
>    print("proc id", os.getpid())
>    print("mp_comm_queue", mp_comm_queue)
>    print("queue2 id", id(mp_comm_queue2))
>    mp_comm_queue2.put(x)
>    print("queue size", mp_comm_queue2.qsize())
>    print("x", x)
>    return x * 2
> 
> def main():
>    global mp_comm_queue
>    #initalize the Queue
>    mp_comm_queue = mp.Queue()
> 
>    #Set up a pool to process a bunch of stuff in parallel
>    pool = mp.Pool()
>    values = range(20)
>    data = pool.imap(some_complex_function, values)
> 
>    for val in data:
>        print(f"**{val}**")
>    print("final queue2 size", mp_comm_queue2.qsize())
> 
> if __name__ == "__main__":
>    main()
> 
> 
> 
> When making your own Process object and stating it then the Queue should be 
> passed into the function as an argument, yes. The error text seems to be part 
> of the Pool implementation, which I'm not as familiar with enough to know the 
> best way to handle it. (Probably something using the "initializer" and 
> "initargs" arguments for Pool)(maybe)
> 
> 
> 
> -----Original Message-----
> From: Python-list <python-list-bounces+david.raymond=tomtom....@python.org> 
> On Behalf Of Israel Brewster
> Sent: Monday, April 6, 2020 1:24 PM
> To: Python <python-list@python.org>
> Subject: Multiprocessing queue sharing and python3.8
> 
> Under python 3.7 (and all previous versions I have used), the following code 
> works properly, and produces the expected output:
> 
> import multiprocessing as mp
> 
> mp_comm_queue = None #Will be initalized in the main function
> mp_comm_queue2=mp.Queue() #Test pre-initalized as well
> 
> def some_complex_function(x):
>    print(id(mp_comm_queue2))
>    assert(mp_comm_queue is not None)
>    print(x)    
>    return x*2
> 
> def main():
>    global mp_comm_queue
>    #initalize the Queue
>    mp_comm_queue=mp.Queue()
> 
>    #Set up a pool to process a bunch of stuff in parallel
>    pool=mp.Pool()
>    values=range(20)
>    data=pool.imap(some_complex_function,values)
> 
>    for val in data:
>        print(f"**{val}**")
> 
> if __name__=="__main__":
>    main()
> 
> - mp_comm_queue2 has the same ID for all iterations of some_complex_function, 
> and the assert passes (mp_comm_queue is not None). However, under python 3.8, 
> it fails - mp_comm_queue2 is a *different* object for each iteration, and the 
> assert fails. 
> 
> So what am I doing wrong with the above example block? Assuming that it broke 
> in 3.8 because I wasn’t sharing the Queue properly, what is the proper way to 
> share a Queue object among multiple processes for the purposes of 
> inter-process communication?
> 
> The documentation 
> (https://docs.python.org/3.8/library/multiprocessing.html#exchanging-objects-between-processes
>  
> <https://docs.python.org/3.8/library/multiprocessing.html#exchanging-objects-between-processes>)
>  appears to indicate that I should pass the queue as an argument to the 
> function to be executed in parallel, however that fails as well (on ALL 
> versions of python I have tried) with the error:
> 
> Traceback (most recent call last):
>  File "test_multi.py", line 32, in <module>
>    main()
>  File "test_multi.py", line 28, in main
>    for val in data:
>  File 
> "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/multiprocessing/pool.py",
>  line 748, in next
>    raise value
>  File 
> "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/multiprocessing/pool.py",
>  line 431, in _handle_tasks
>    put(task)
>  File 
> "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/multiprocessing/connection.py",
>  line 206, in send
>    self._send_bytes(_ForkingPickler.dumps(obj))
>  File 
> "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/multiprocessing/reduction.py",
>  line 51, in dumps
>    cls(buf, protocol).dump(obj)
>  File 
> "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/multiprocessing/queues.py",
>  line 58, in __getstate__
>    context.assert_spawning(self)
>  File 
> "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/multiprocessing/context.py",
>  line 356, in assert_spawning
>    ' through inheritance' % type(obj).__name__
> RuntimeError: Queue objects should only be shared between processes through 
> inheritance
> 
> after I add the following to the code to try passing the queue rather than 
> having it global:
> 
> #Try by passing queue
> values=[(x,mp_comm_queue) for x in range(20)]
> data=pool.imap(some_complex_function,values)
> for val in data:
>    print(f"**{val}**")   
> 
> So if I can’t pass it as an argument, and having it global is incorrect (at 
> least starting with 3.8), what is the proper method of getting 
> multiprocessing queues to child processes?
> 
> ---
> Israel Brewster
> Software Engineer
> Alaska Volcano Observatory 
> Geophysical Institute - UAF 
> 2156 Koyukuk Drive 
> Fairbanks AK 99775-7320
> Work: 907-474-5172
> cell:  907-328-9145
> 
> -- 
> https://mail.python.org/mailman/listinfo/python-list
> -- 
> https://mail.python.org/mailman/listinfo/python-list

-- 
https://mail.python.org/mailman/listinfo/python-list

Reply via email to