Jens <multiks2...@gmail.com> added the comment:
Raymond, thanks for your suggestions. My deployed applications don't hold up 20m items at a time, that was a way to show the leak. I was able to resolve the threading, queue-based leaks on my instances by modifying the Queue, Event and Conditions classes to use external doubly linked list library in the following manner (python3.7), not the clearest rewrite, but just to get the idea: class DllistCondition(threading.Condition): def __init__(self, lock=None): if lock is None: lock = threading.RLock() self._lock = lock self.acquire = lock.acquire self.release = lock.release try: self._release_save = lock._release_save except AttributeError: pass try: self._acquire_restore = lock._acquire_restore except AttributeError: pass try: self._is_owned = lock._is_owned except AttributeError: pass self._waiters = dllist() def notify(self, n=1): if not self._is_owned(): raise RuntimeError("cannot notify on un-acquired lock") all_waiters = self._waiters waiters_to_notify = all_waiters #_islize would be empty only if there are no waiters avail, for any n if len(waiters_to_notify) < 1: return node = waiters_to_notify.first i = 1 while True: #simulate _islice if i > n: return waiter = node.value #get next node before release node_next = node.next waiter.release() try: all_waiters.remove(node) except ValueError: pass i += 1 node = node_next #if it's the last node, return if node is None: return def wait(self, timeout=None): if not self._is_owned(): raise RuntimeError("cannot wait on un-acquired lock") waiter = threading._allocate_lock() waiter.acquire() node = self._waiters.append(waiter) saved_state = self._release_save() gotit = False try: # restore state no matter what (e.g., KeyboardInterrupt) if timeout is None: waiter.acquire() gotit = True else: if timeout > 0: gotit = waiter.acquire(True, timeout) else: gotit = waiter.acquire(False) return gotit finally: self._acquire_restore(saved_state) if not gotit: try: self._waiters.remove(node) except ValueError: pass class DllistEvent(threading.Event): def __init__(self): self._cond = DllistCondition(threading.Lock()) self._flag = False class DllistQueue(queue.Queue): def __init__(self, maxsize=0): self.maxsize = maxsize self._init(maxsize) self.mutex = threading.Lock() self.not_empty = DllistCondition(self.mutex) self.not_full = DllistCondition(self.mutex) self.all_tasks_done = DllistCondition(self.mutex) self.unfinished_tasks = 0 def _init(self, maxsize): self.queue = dllist() Now, I'm not exactly sure that the Queue itself required the `self.queue = deque()`modification, but I'm sure that conditions required getting rid of the stock `self._waiters = deque()` and the consequent use of it. Memory profiling constantly showed a leak at waiters_to_notify = _deque(_islice(all_waiters, n) in the threading.Condition class, which is both employed by threading.Queue and treading.Event classes. After the modifiction this leak is gone and I suspect it has to do something with reiniting the deque at `waiters_to_notify = _deque(_islice(all_waiters, n)` In any cases, I'm leaving this out here in case anyone get to deal with it as well and it might be helpful. Thanks ---------- _______________________________________ Python tracker <rep...@bugs.python.org> <https://bugs.python.org/issue43911> _______________________________________ _______________________________________________ Python-bugs-list mailing list Unsubscribe: https://mail.python.org/mailman/options/python-bugs-list/archive%40mail-archive.com