New submission from David Chen <chenzhuowan...@163.com>:
could someone help me out? i spent a lot of time to debug a race condition i have encountered when using BaseManager, Pool within multiprocessing library. here is the simplified code: ``` import sys, time from multiprocessing.managers import BaseManager, SyncManager, BaseProxy from multiprocessing import Process, cpu_count, Pool, Lock, get_context from multiprocessing.queues import Queue, JoinableQueue import queue class QueueManager(BaseManager): pass class Singleton: ''' Decorator class for singleton pattern. ''' def __init__(self, cls): self._cls = cls self._lock = Lock() self._instance = {} def __call__(self, *args, **kwargs): if self._cls not in self._instance: with self._lock: self._instance[self._cls] = self._cls(*args, **kwargs) return self._instance[self._cls] def getInstance(self): return self._instance[self._cls] class LoggingServer(object): def __init__(self, address, pwd): self.logServerAddr = address self.logServerPwd = pwd self.msgQueue = queue.Queue() try: QueueManager.register('getQueue', callable=lambda: self.msgQueue) self.queueManager = QueueManager(address = self.logServerAddr, authkey = self.logServerPwd) self.logServer = self.queueManager.get_server() self.logServer.serve_forever() except: raise RuntimeError("Couldn't start the logging server!") class LoggingProcess(object): def __init__(self, address, pwd): self.logServerAddr = address self.logServerPwd = pwd try: QueueManager.register('getQueue') self.queueManager = QueueManager(address = self.logServerAddr, authkey = self.logServerPwd) self.queueManager.connect() except: raise RuntimeError("Couldn't connect logging process to the logging server!") self.msgQueue = self.queueManager.getQueue() self.process = Process(target=self.loggingProcess, name = "Logging Process", args=(), daemon = True) self.process.start() def terminate(self): self.msgQueue.join() self.process.terminate() def loggingProcess(self): while True: logObj = self.msgQueue.get() print(logObj) @Singleton class Logger(object): def __init__(self, address, pwd): self.logServerAddr = address self.logServerPwd = pwd self.queueManager = None self.msgQueue = None def connectToLogServer(self): try: QueueManager.register('getQueue') self.queueManager = QueueManager(address = self.logServerAddr, authkey = self.logServerPwd) self.queueManager.connect() self.msgQueue = self.queueManager.getQueue() self.ready = True except: raise RuntimeError("Couldn't connect logger to Log Server!") def ReadyCheck(func): def makeDecorator(self, *args, **kwargs): if not self.msgQueue: self.connectToLogServer() func(self, *args, **kwargs) return makeDecorator # Overrided function to log info @ReadyCheck def info(self, info, logfile = sys.stdout): self.msgQueue.put(info) address = ('', 50000) password = b'PASSWORD' log = Logger(address, password) def callback(*args): #print("Finished!!!") pass def job(index): time.sleep(0.1) log.info(str(log.msgQueue) + ":{}".format(index)) log.info("here {}".format(index)) if __name__ == "__main__": # import multiprocessing # logger = multiprocessing.log_to_stderr() # logger.setLevel(multiprocessing.SUBDEBUG) serverProcess = Process(target = LoggingServer, name = "LoggingServerDaemon", args = ((address, password)), daemon = True) serverProcess.start() time.sleep(1) loggingProcess = LoggingProcess(address, password) log.info("Starting...") #pool = Pool(cpu_count()) pool = Pool() #Using a small number of worker(like 10), no problem, but if we increase to a bigger number, say 48 in my case, this program hangs every time... results = [pool.apply_async(job, (i,), callback = callback) for i in range(1)] pool.close() pool.join() log.info("Done") #loggingProcess.terminate() #serverProcess.terminate() ``` LoggerServer class is working as a logging Server(like a proxy), which manages a shared queue. LoggingProcess class is a log consumer class, which fetch the logs from the shared queue(managed by LoggingServer). Logger class is a producer class, which put the logs into the shared queue. As i want to share the global logger in multiple modules in order to unify the logs format/output places/...(something like the logging standard library), so the Logger class is not fully initialized and will be fully initialized later when using it(please see connectToLogServer). and i highly suspect this is root cause of program hang, but i can't go further... the hang sub-process's(ForkPoolWorker) traceback is like the following(using py-spy): ``` Process 3958088: python3 Logger.py Python v3.9.0 (/usr/bin/python3.9) Thread 3958088 (idle): "MainThread" _recv (/usr/lib/python3.9/multiprocessing/connection.py:384) _recv_bytes (/usr/lib/python3.9/multiprocessing/connection.py:419) recv_bytes (/usr/lib/python3.9/multiprocessing/connection.py:221) answer_challenge (/usr/lib/python3.9/multiprocessing/connection.py:757) Client (/usr/lib/python3.9/multiprocessing/connection.py:513) _decref (/usr/lib/python3.9/multiprocessing/managers.py:861) __call__ (/usr/lib/python3.9/multiprocessing/util.py:224) _run_finalizers (/usr/lib/python3.9/multiprocessing/util.py:300) _exit_function (/usr/lib/python3.9/multiprocessing/util.py:334) _bootstrap (/usr/lib/python3.9/multiprocessing/process.py:318) _launch (/usr/lib/python3.9/multiprocessing/popen_fork.py:71) __init__ (/usr/lib/python3.9/multiprocessing/popen_fork.py:19) _Popen (/usr/lib/python3.9/multiprocessing/context.py:277) start (/usr/lib/python3.9/multiprocessing/process.py:121) _repopulate_pool_static (/usr/lib/python3.9/multiprocessing/pool.py:326) _repopulate_pool (/usr/lib/python3.9/multiprocessing/pool.py:303) __init__ (/usr/lib/python3.9/multiprocessing/pool.py:212) Pool (/usr/lib/python3.9/multiprocessing/context.py:119) <module> (/slowfs/cn59sig01/usr/zhuoc/work/qualification-kit/reproducer/Logger.py:129) ``` it seems the refcount of the shared queue failed to be decref... I googled a lot stuffs, but none seems to be the same with this... so i bring this issue here for help. Any comments and suggestions are highly appreciated! Traceback after CTRL+C: ``` raceback (most recent call last): File "/slowfs/cn59sig01/usr/zhuoc/work/qualification-kit/reproducer/Logger.py", line 43, in __init__ self.logServer.serve_forever() File "/usr/lib/python3.9/multiprocessing/managers.py", line 183, in serve_forever sys.exit(0) SystemExit: 0 During handling of the above exception, another exception occurred: Traceback (most recent call last): Traceback (most recent call last): File "/usr/lib/python3.9/multiprocessing/process.py", line 315, in _bootstrap self.run() File "/usr/lib/python3.9/multiprocessing/process.py", line 315, in _bootstrap self.run() File "/usr/lib/python3.9/multiprocessing/process.py", line 108, in run self._target(*self._args, **self._kwargs) File "/slowfs/cn59sig01/usr/zhuoc/work/qualification-kit/reproducer/Logger.py", line 68, in loggingProcess logObj = self.msgQueue.get() File "<string>", line 2, in get File "/usr/lib/python3.9/multiprocessing/managers.py", line 809, in _callmethod kind, result = conn.recv() File "/usr/lib/python3.9/multiprocessing/connection.py", line 255, in recv buf = self._recv_bytes() File "/usr/lib/python3.9/multiprocessing/connection.py", line 419, in _recv_bytes buf = self._recv(4) File "/usr/lib/python3.9/multiprocessing/connection.py", line 384, in _recv chunk = read(handle, remaining) KeyboardInterrupt File "/usr/lib/python3.9/multiprocessing/process.py", line 108, in run self._target(*self._args, **self._kwargs) File "/slowfs/cn59sig01/usr/zhuoc/work/qualification-kit/reproducer/Logger.py", line 45, in __init__ raise RuntimeError("Couldn't start the logging server!") RuntimeError: Couldn't start the logging server! ``` ---------- components: Library (Lib) messages: 393796 nosy: chenzhuowansui priority: normal severity: normal status: open title: Race condition when using multiprocessing BaseManager and Pool in Python3 type: behavior versions: Python 3.9 _______________________________________ Python tracker <rep...@bugs.python.org> <https://bugs.python.org/issue44155> _______________________________________ _______________________________________________ Python-bugs-list mailing list Unsubscribe: https://mail.python.org/mailman/options/python-bugs-list/archive%40mail-archive.com