Jens <multiks2...@gmail.com> added the comment:

Raymond, thanks for your suggestions.

My deployed applications don't hold up 20m items at a time, that was a way to 
show the leak. 

I was able to resolve the threading, queue-based leaks on my instances by 
modifying the Queue, Event and Conditions classes to use external doubly linked 
list library in the following manner (python3.7), not the clearest rewrite, but 
just to get the idea:

    class DllistCondition(threading.Condition):
        def __init__(self, lock=None):
            if lock is None:
                lock = threading.RLock()
            self._lock = lock
            self.acquire = lock.acquire
            self.release = lock.release
            try:
                self._release_save = lock._release_save
            except AttributeError:
                pass
            try:
                self._acquire_restore = lock._acquire_restore
            except AttributeError:
                pass
            try:
                self._is_owned = lock._is_owned
            except AttributeError:
                pass
            self._waiters = dllist()

        def notify(self, n=1):
            if not self._is_owned():
                raise RuntimeError("cannot notify on un-acquired lock")
            all_waiters = self._waiters
            waiters_to_notify = all_waiters
            #_islize would be empty only if there are no waiters avail, for any 
n
            if len(waiters_to_notify) < 1:
                return

            node = waiters_to_notify.first
            i = 1
            while True:
                #simulate _islice
                if i > n:
                    return

                waiter = node.value
                #get next node before release
                node_next = node.next
                waiter.release()
                try:
                    all_waiters.remove(node)
                except ValueError:
                    pass

                i += 1
                node = node_next
                #if it's the last node, return
                if node is None:
                    return




        def wait(self, timeout=None):
            if not self._is_owned():
                raise RuntimeError("cannot wait on un-acquired lock")
            waiter = threading._allocate_lock()
            waiter.acquire()
            node = self._waiters.append(waiter)
            saved_state = self._release_save()
            gotit = False
            try:    # restore state no matter what (e.g., KeyboardInterrupt)
                if timeout is None:
                    waiter.acquire()
                    gotit = True
                else:
                    if timeout > 0:
                        gotit = waiter.acquire(True, timeout)
                    else:
                        gotit = waiter.acquire(False)
                return gotit
            finally:
                self._acquire_restore(saved_state)
                if not gotit:
                    try:
                        self._waiters.remove(node)
                    except ValueError:
                        pass

    class DllistEvent(threading.Event):
        def __init__(self):
            self._cond = DllistCondition(threading.Lock())
            self._flag = False

    class DllistQueue(queue.Queue):

        def __init__(self, maxsize=0):
            self.maxsize = maxsize
            self._init(maxsize)

            self.mutex = threading.Lock()

            self.not_empty = DllistCondition(self.mutex)

            self.not_full = DllistCondition(self.mutex)

            self.all_tasks_done = DllistCondition(self.mutex)
            self.unfinished_tasks = 0

        def _init(self, maxsize):
            self.queue = dllist()

Now, I'm not exactly sure that the Queue itself required the `self.queue = 
deque()`modification, but I'm sure that conditions required getting rid of the 
stock `self._waiters = deque()` and the consequent use of it. 

Memory profiling constantly showed a leak at waiters_to_notify = 
_deque(_islice(all_waiters, n) in the threading.Condition class, which is both 
employed by threading.Queue and treading.Event classes. 

After the modifiction this leak is gone and I suspect it has to do something 
with reiniting the deque at `waiters_to_notify = _deque(_islice(all_waiters, n)`

In any cases, I'm leaving this out here in case anyone get to deal with it as 
well and it might be helpful. 

Thanks

----------

_______________________________________
Python tracker <rep...@bugs.python.org>
<https://bugs.python.org/issue43911>
_______________________________________
_______________________________________________
Python-bugs-list mailing list
Unsubscribe: 
https://mail.python.org/mailman/options/python-bugs-list/archive%40mail-archive.com

Reply via email to