Nir Soffer has posted comments on this change. Change subject: executor: introduce the executor library ......................................................................
Patch Set 21: (4 comments) http://gerrit.ovirt.org/#/c/29191/21/lib/vdsm/executor.py File lib/vdsm/executor.py: Line 69: with self._lock: Line 70: self._running = False Line 71: self._tasks.clear() Line 72: for i in range(self._workers_count): Line 73: self._tasks.put((_STOP, 0)) Lets use None here, which mean no timeout. It does not matter because the code should check for _STOP, but 0 simply does not make sense. Line 74: workers = tuple(self._workers) if wait else () Line 75: for worker in workers: Line 76: try: Line 77: worker.join() Line 210: def __init__(self, max_tasks): Line 211: self._max_tasks = max_tasks Line 212: self._tasks = collections.deque() Line 213: # deque is thread safe - we can append and pop from both ends without Line 214: # additioanl locking. We need this condition only for waiting. Lets add a url to the docs about this: https://docs.python.org/2.6/library/queue.html#Queue.Full Yes, this fact is documented in Queue docs instead of deque docs :-) Line 215: self._cond = threading.Condition(threading.Lock()) Line 216: self._waiters = 0 Line 217: Line 218: def put(self, task): Line 222: if self._waiters > 0: Line 223: with self._cond: Line 224: self._cond.notify() Line 225: Line 226: def get(self): We have tiny race here: 1. thread 1 call get and find the queue empty 2. context switch to thread 2 3. thread 2 call put, adding a task to the queue 4. thread 2 check self._waiters == 0 and return 5. context switch to thread 1 6. thread 1 set waiters to 1 and wait on the condition I think that we will never see this race during sampling, since the waiting thread will be woke up by the next message, and we have a constant stream of tasks. But as a general utility, we should not have such races. Line 227: while True: Line 228: try: Line 229: return self._tasks.popleft() Line 230: except IndexError: Line 229: return self._tasks.popleft() Line 230: except IndexError: Line 231: with self._cond: Line 232: self._waiters += 1 Line 233: self._cond.wait() To avoid the race explained above: if self._tasks.empty(): self._cond.wait() So if another thread added a task before I bumped _waiters, I will not wait. Hopefully I'm not wrong about this, avoiding unneeded locking is tricky. Line 234: self._waiters -= 1 Line 235: Line 236: def clear(self): -- To view, visit http://gerrit.ovirt.org/29191 To unsubscribe, visit http://gerrit.ovirt.org/settings Gerrit-MessageType: comment Gerrit-Change-Id: Ic06da1ba57868dc2c7db67a1868ad10087a1cff2 Gerrit-PatchSet: 21 Gerrit-Project: vdsm Gerrit-Branch: master Gerrit-Owner: Francesco Romani <[email protected]> Gerrit-Reviewer: Antoni Segura Puimedon <[email protected]> Gerrit-Reviewer: Dan Kenigsberg <[email protected]> Gerrit-Reviewer: Dima Kuznetsov <[email protected]> Gerrit-Reviewer: Federico Simoncelli <[email protected]> Gerrit-Reviewer: Francesco Romani <[email protected]> Gerrit-Reviewer: Michal Skrivanek <[email protected]> Gerrit-Reviewer: Nir Soffer <[email protected]> Gerrit-Reviewer: Saggi Mizrahi <[email protected]> Gerrit-Reviewer: Vinzenz Feenstra <[email protected]> Gerrit-Reviewer: Yaniv Bronhaim <[email protected]> Gerrit-Reviewer: [email protected] Gerrit-Reviewer: oVirt Jenkins CI Server Gerrit-HasComments: Yes _______________________________________________ vdsm-patches mailing list [email protected] https://lists.fedorahosted.org/mailman/listinfo/vdsm-patches
