Jens <[email protected]> added the comment:
Raymond, thanks for your suggestions.
My deployed applications don't hold up 20m items at a time, that was a way to
show the leak.
I was able to resolve the threading, queue-based leaks on my instances by
modifying the Queue, Event and Conditions classes to use external doubly linked
list library in the following manner (python3.7), not the clearest rewrite, but
just to get the idea:
class DllistCondition(threading.Condition):
def __init__(self, lock=None):
if lock is None:
lock = threading.RLock()
self._lock = lock
self.acquire = lock.acquire
self.release = lock.release
try:
self._release_save = lock._release_save
except AttributeError:
pass
try:
self._acquire_restore = lock._acquire_restore
except AttributeError:
pass
try:
self._is_owned = lock._is_owned
except AttributeError:
pass
self._waiters = dllist()
def notify(self, n=1):
if not self._is_owned():
raise RuntimeError("cannot notify on un-acquired lock")
all_waiters = self._waiters
waiters_to_notify = all_waiters
#_islize would be empty only if there are no waiters avail, for any
n
if len(waiters_to_notify) < 1:
return
node = waiters_to_notify.first
i = 1
while True:
#simulate _islice
if i > n:
return
waiter = node.value
#get next node before release
node_next = node.next
waiter.release()
try:
all_waiters.remove(node)
except ValueError:
pass
i += 1
node = node_next
#if it's the last node, return
if node is None:
return
def wait(self, timeout=None):
if not self._is_owned():
raise RuntimeError("cannot wait on un-acquired lock")
waiter = threading._allocate_lock()
waiter.acquire()
node = self._waiters.append(waiter)
saved_state = self._release_save()
gotit = False
try: # restore state no matter what (e.g., KeyboardInterrupt)
if timeout is None:
waiter.acquire()
gotit = True
else:
if timeout > 0:
gotit = waiter.acquire(True, timeout)
else:
gotit = waiter.acquire(False)
return gotit
finally:
self._acquire_restore(saved_state)
if not gotit:
try:
self._waiters.remove(node)
except ValueError:
pass
class DllistEvent(threading.Event):
def __init__(self):
self._cond = DllistCondition(threading.Lock())
self._flag = False
class DllistQueue(queue.Queue):
def __init__(self, maxsize=0):
self.maxsize = maxsize
self._init(maxsize)
self.mutex = threading.Lock()
self.not_empty = DllistCondition(self.mutex)
self.not_full = DllistCondition(self.mutex)
self.all_tasks_done = DllistCondition(self.mutex)
self.unfinished_tasks = 0
def _init(self, maxsize):
self.queue = dllist()
Now, I'm not exactly sure that the Queue itself required the `self.queue =
deque()`modification, but I'm sure that conditions required getting rid of the
stock `self._waiters = deque()` and the consequent use of it.
Memory profiling constantly showed a leak at waiters_to_notify =
_deque(_islice(all_waiters, n) in the threading.Condition class, which is both
employed by threading.Queue and treading.Event classes.
After the modifiction this leak is gone and I suspect it has to do something
with reiniting the deque at `waiters_to_notify = _deque(_islice(all_waiters, n)`
In any cases, I'm leaving this out here in case anyone get to deal with it as
well and it might be helpful.
Thanks
----------
_______________________________________
Python tracker <[email protected]>
<https://bugs.python.org/issue43911>
_______________________________________
_______________________________________________
Python-bugs-list mailing list
Unsubscribe:
https://mail.python.org/mailman/options/python-bugs-list/archive%40mail-archive.com