New submission from john ryan <jr...@tactiq.co.uk>:
I am building an application that is made up of several separate processes, where each process is a python program. They are all started by the supervisord utility and execute within a venv running Python 3.8.5 (default, Aug 13 2020, 15:42:06) [GCC 7.5.0] on linux, under Ubuntu 18.04. I am using a multiprocessing BaseManager to implement a repository of queues. Each process asks for a queue by name then uses put/get on that queue. The application needs to be resilient so it must be possible to restart the respository process and have the various client processes re-connect to the queues hosted by it. The problem I am getting is that the first call to `get_queue()` after restarting the BaseManager server process does not return a queue. The sequence below shows some testing by hand. (My test environment runs Ubuntu in a virtualbox hosted on Windows 8.1) Here I started the server in a different terminal then started python as below (both pythons in the same venv). This works as expected with the first call to get_queue returning a queue. ``` (hydra_env) john@u1804-VirtualBox:~/sw/code/hydra$ python Python 3.8.5 (default, Aug 13 2020, 15:42:06) [GCC 7.5.0] on linux Type "help", "copyright", "credits" or "license" for more information. >>> from multiprocessing.managers import BaseManager >>> class QueueManager(BaseManager): pass ... >>> QueueManager.register('get_queue') >>> mgr = QueueManager(address=('localhost', 50000), authkey=b'abracadabra' ) >>> mgr.connect() >>> q = mgr.get_queue('name', 'src'); print(str(q)) <multiprocessing.queues.Queue object at 0x7f98403c1820> >>> q = mgr.get_queue('name', 'src'); print(str(q)) <multiprocessing.queues.Queue object at 0x7f98403c1820> ``` Stop and restart the server to see the problem. The first call to get_queue seems to succeed but in fact it has failed as shown by the print(str...). The second call to get_queue succeeds. ``` >>> mgr.connect() >>> q = mgr.get_queue('name', 'src'); print(str(q)) <AutoProxy[get_queue] object, typeid 'get_queue' at 0x7f280afdc160; '__str__()' failed> >>> q = mgr.get_queue('name', 'src'); print(str(q)) <multiprocessing.queues.Queue object at 0x7ff5f1b46820> ``` The server logs show it sent queues on all 4 calls ``` ^C(hydra_env) john@u1804-VirtualBox:~/sw/code/hydra$ python ../../trials/test_mgr.py starting serving <multiprocessing.queues.Queue object at 0x7f98403c1820> serving <multiprocessing.queues.Queue object at 0x7f98403c1820> ^C(hydra_env) john@u1804-VirtualBox:~/sw/code/hydra$ python ../../trials/test_mgr.py starting serving <multiprocessing.queues.Queue object at 0x7ff5f1b46820> serving <multiprocessing.queues.Queue object at 0x7ff5f1b46820> ``` I get the same behaviour if I re-instantiate the local manager object ``` >>> mgr = QueueManager(address=('localhost', 50000), authkey=b'abracadabra' ) >>> mgr.connect() >>> q = mgr.get_queue('name', 'src'); print(str(q)) <AutoProxy[get_queue] object, typeid 'get_queue' at 0x7f280afdc2b0; '__str__()' failed> >>> q = mgr.get_queue('name', 'src'); print(str(q)) <multiprocessing.queues.Queue object at 0x7ff8dabd7820> >>> ``` I even get the same behaviour if I just call `get_queue()` after restarting the server (ie without explicitly reconnecting). I would have expected the first call to `get_queue()` to return a valid queue since neither it nor the call to `connect()` raised any kind of error. It seems to me that there is some kind of state held that is the underlying cause of the issue. I did some investigating in but I was not able to work out what was happening. I found that it was possible to get into a state where a valid queue was never returned by `get_queue()` if an error had been raised by `get_nowait()` first. Stop the server ``` >>> q.get_nowait() Traceback (most recent call last): File "<stdin>", line 1, in <module> File "<string>", line 2, in get_nowait File "/home/john/.pyenv/versions/3.8.5/lib/python3.8/multiprocessing/managers.py", line 835, in _callmethod kind, result = conn.recv() File "/home/john/.pyenv/versions/3.8.5/lib/python3.8/multiprocessing/connection.py", line 250, in recv buf = self._recv_bytes() File "/home/john/.pyenv/versions/3.8.5/lib/python3.8/multiprocessing/connection.py", line 414, in _recv_bytes buf = self._recv(4) File "/home/john/.pyenv/versions/3.8.5/lib/python3.8/multiprocessing/connection.py", line 383, in _recv raise EOFError EOFError ``` Restart the server but do not call `get_queue()` ``` >>> q.get_nowait() Traceback (most recent call last): File "<stdin>", line 1, in <module> File "<string>", line 2, in get_nowait File "/home/john/.pyenv/versions/3.8.5/lib/python3.8/multiprocessing/managers.py", line 834, in _callmethod conn.send((self._id, methodname, args, kwds)) File "/home/john/.pyenv/versions/3.8.5/lib/python3.8/multiprocessing/connection.py", line 206, in send self._send_bytes(_ForkingPickler.dumps(obj)) File "/home/john/.pyenv/versions/3.8.5/lib/python3.8/multiprocessing/connection.py", line 411, in _send_bytes self._send(header + buf) File "/home/john/.pyenv/versions/3.8.5/lib/python3.8/multiprocessing/connection.py", line 368, in _send n = write(self._handle, buf) BrokenPipeError: [Errno 32] Broken pipe >>> q = mgr.get_queue('name', 'src'); print(str(q)) <AutoProxy[get_queue] object, typeid 'get_queue' at 0x7f1e21f63070; '__str__()' failed> >>> q = mgr.get_queue('name', 'src'); print(str(q)) <AutoProxy[get_queue] object, typeid 'get_queue' at 0x7f1e21f63130; '__str__()' failed> >>> q = mgr.get_queue('name', 'src'); print(str(q)) <AutoProxy[get_queue] object, typeid 'get_queue' at 0x7f1e21f63220; '__str__()' failed> >>> q = mgr.get_queue('name', 'src'); print(str(q)) <AutoProxy[get_queue] object, typeid 'get_queue' at 0x7f1e21f631f0; '__str__()' failed> ``` This continued while I was testing, but returned a queue some time later so was perhaps stuck on a timeout. Python 3.9.0 - I did a limited amount of testing on Python 3.9.0 and the results appeared to be the same. The code for the test server is here ``` from multiprocessing.managers import BaseManager from multiprocessing import Queue class QueueManager(BaseManager): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.__queues = {} # dict of instances keyed on qname def get_queue(self, qname: str, src: str) -> Queue: if qname not in self.__queues: self.__queues[qname] = Queue() the_q = self.__queues[qname] print(f'serving {the_q}') return the_q def main() -> None: """main for a process serving queues forever""" mgr = QueueManager(address=('localhost', 50000), authkey=b'abracadabra' ) QueueManager.register('get_queue', callable=mgr.get_queue) server_object = mgr.get_server() print('starting') server_object.serve_forever() if __name__ == '__main__': main() ``` ---------- components: Library (Lib) messages: 379657 nosy: jryan priority: normal severity: normal status: open title: Bad proxy returned immediately after BaseManager server restarted versions: Python 3.8, Python 3.9 _______________________________________ Python tracker <rep...@bugs.python.org> <https://bugs.python.org/issue42154> _______________________________________ _______________________________________________ Python-bugs-list mailing list Unsubscribe: https://mail.python.org/mailman/options/python-bugs-list/archive%40mail-archive.com