> On Apr 6, 2020, at 12:19 PM, David Raymond <[email protected]> wrote:
>
> Attempting reply as much for my own understanding.
>
> Are you on Mac? I think this is the pertinent bit for you:
> Changed in version 3.8: On macOS, the spawn start method is now the default.
> The fork start method should be considered unsafe as it can lead to crashes
> of the subprocess. See bpo-33725.
Ahhh, yep, that would do it! Using spawn rather than fork completely explains
all the issues I was suddenly seeing. Didn’t even occur to me that the os I was
running might make a difference. And yes, forcing it back to using fork does
indeed “fix” the issue. Of course, as is noted there, the fork start method
should be considered unsafe, so I guess I get to re-architect everything I do
using multiprocessing that relies on data-sharing between processes. The Queue
example was just a minimum working example that illustrated the behavioral
differences I was seeing :-) Thanks for the pointer!
---
Israel Brewster
Software Engineer
Alaska Volcano Observatory
Geophysical Institute - UAF
2156 Koyukuk Drive
Fairbanks AK 99775-7320
Work: 907-474-5172
cell: 907-328-9145
>
> When you start a new process (with the spawn method) it runs the module just
> like it's being imported. So your global " mp_comm_queue2=mp.Queue()" creates
> a new Queue in each process. Your initialization of mp_comm_queue is also
> done inside the main() function, which doesn't get run in each process. So
> each process in the Pool is going to have mp_comm_queue as None, and have its
> own version of mp_comm_queue2. The ID being the same or different is the
> result of one or more processes in the Pool being used repeatedly for the
> multiple steps in imap, probably because the function that the Pool is
> executing finishes so quickly.
>
> Add a little extra info to the print calls (and/or set up logging to stdout
> with the process name/id included) and you can see some of this. Here's the
> hacked together changes I did for that.
>
> import multiprocessing as mp
> import os
>
> mp_comm_queue = None #Will be initalized in the main function
> mp_comm_queue2 = mp.Queue() #Test pre-initalized as well
>
> def some_complex_function(x):
> print("proc id", os.getpid())
> print("mp_comm_queue", mp_comm_queue)
> print("queue2 id", id(mp_comm_queue2))
> mp_comm_queue2.put(x)
> print("queue size", mp_comm_queue2.qsize())
> print("x", x)
> return x * 2
>
> def main():
> global mp_comm_queue
> #initalize the Queue
> mp_comm_queue = mp.Queue()
>
> #Set up a pool to process a bunch of stuff in parallel
> pool = mp.Pool()
> values = range(20)
> data = pool.imap(some_complex_function, values)
>
> for val in data:
> print(f"**{val}**")
> print("final queue2 size", mp_comm_queue2.qsize())
>
> if __name__ == "__main__":
> main()
>
>
>
> When making your own Process object and stating it then the Queue should be
> passed into the function as an argument, yes. The error text seems to be part
> of the Pool implementation, which I'm not as familiar with enough to know the
> best way to handle it. (Probably something using the "initializer" and
> "initargs" arguments for Pool)(maybe)
>
>
>
> -----Original Message-----
> From: Python-list <[email protected]
> <mailto:[email protected]>> On Behalf
> Of Israel Brewster
> Sent: Monday, April 6, 2020 1:24 PM
> To: Python <[email protected] <mailto:[email protected]>>
> Subject: Multiprocessing queue sharing and python3.8
>
> Under python 3.7 (and all previous versions I have used), the following code
> works properly, and produces the expected output:
>
> import multiprocessing as mp
>
> mp_comm_queue = None #Will be initalized in the main function
> mp_comm_queue2=mp.Queue() #Test pre-initalized as well
>
> def some_complex_function(x):
> print(id(mp_comm_queue2))
> assert(mp_comm_queue is not None)
> print(x)
> return x*2
>
> def main():
> global mp_comm_queue
> #initalize the Queue
> mp_comm_queue=mp.Queue()
>
> #Set up a pool to process a bunch of stuff in parallel
> pool=mp.Pool()
> values=range(20)
> data=pool.imap(some_complex_function,values)
>
> for val in data:
> print(f"**{val}**")
>
> if __name__=="__main__":
> main()
>
> - mp_comm_queue2 has the same ID for all iterations of some_complex_function,
> and the assert passes (mp_comm_queue is not None). However, under python 3.8,
> it fails - mp_comm_queue2 is a *different* object for each iteration, and the
> assert fails.
>
> So what am I doing wrong with the above example block? Assuming that it broke
> in 3.8 because I wasn’t sharing the Queue properly, what is the proper way to
> share a Queue object among multiple processes for the purposes of
> inter-process communication?
>
> The documentation
> (https://docs.python.org/3.8/library/multiprocessing.html#exchanging-objects-between-processes
>
> <https://docs.python.org/3.8/library/multiprocessing.html#exchanging-objects-between-processes>
>
> <https://docs.python.org/3.8/library/multiprocessing.html#exchanging-objects-between-processes
>
> <https://docs.python.org/3.8/library/multiprocessing.html#exchanging-objects-between-processes>>)
> appears to indicate that I should pass the queue as an argument to the
> function to be executed in parallel, however that fails as well (on ALL
> versions of python I have tried) with the error:
>
> Traceback (most recent call last):
> File "test_multi.py", line 32, in <module>
> main()
> File "test_multi.py", line 28, in main
> for val in data:
> File
> "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/multiprocessing/pool.py",
> line 748, in next
> raise value
> File
> "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/multiprocessing/pool.py",
> line 431, in _handle_tasks
> put(task)
> File
> "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/multiprocessing/connection.py",
> line 206, in send
> self._send_bytes(_ForkingPickler.dumps(obj))
> File
> "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/multiprocessing/reduction.py",
> line 51, in dumps
> cls(buf, protocol).dump(obj)
> File
> "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/multiprocessing/queues.py",
> line 58, in __getstate__
> context.assert_spawning(self)
> File
> "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/multiprocessing/context.py",
> line 356, in assert_spawning
> ' through inheritance' % type(obj).__name__
> RuntimeError: Queue objects should only be shared between processes through
> inheritance
>
> after I add the following to the code to try passing the queue rather than
> having it global:
>
> #Try by passing queue
> values=[(x,mp_comm_queue) for x in range(20)]
> data=pool.imap(some_complex_function,values)
> for val in data:
> print(f"**{val}**")
>
> So if I can’t pass it as an argument, and having it global is incorrect (at
> least starting with 3.8), what is the proper method of getting
> multiprocessing queues to child processes?
>
> ---
> Israel Brewster
> Software Engineer
> Alaska Volcano Observatory
> Geophysical Institute - UAF
> 2156 Koyukuk Drive
> Fairbanks AK 99775-7320
> Work: 907-474-5172
> cell: 907-328-9145
>
> --
> https://mail.python.org/mailman/listinfo/python-list
> <https://mail.python.org/mailman/listinfo/python-list>
> --
> https://mail.python.org/mailman/listinfo/python-list
> <https://mail.python.org/mailman/listinfo/python-list>
--
https://mail.python.org/mailman/listinfo/python-list