Francesco Romani has posted comments on this change.

Change subject: executor: introduce the executor library
......................................................................


Patch Set 11:

(14 comments)

initial response

http://gerrit.ovirt.org/#/c/29191/11/lib/vdsm/executor.py
File lib/vdsm/executor.py:

Line 28: import logging
Line 29: import Queue
Line 30: import threading
Line 31: 
Line 32: import utils
> We can use now relative imports:
Done
Line 33: 
Line 34: 
Line 35: class ExecutorNotRunning(Exception):
Line 36:     """


Line 31: 
Line 32: import utils
Line 33: 
Line 34: 
Line 35: class ExecutorNotRunning(Exception):
> "executor.ExecutorNotRunning" repeat the "Executor" word for no reason. We 
Sure. removed the repetition.
Line 36:     """
Line 37:     Executor not yet started or shutting down.
Line 38:     """
Line 39: 


Line 37:     Executor not yet started or shutting down.
Line 38:     """
Line 39: 
Line 40: 
Line 41: class ExecutorAlreadyStarted(Exception):
> "executor.ExecutorAllreadyStarted" repeat the "Executor" word for no reason
As above
Line 42:     """
Line 43:     Executor started multiple times.
Line 44:     """
Line 45: 


Line 51:     """
Line 52: 
Line 53:     _log = logging.getLogger("Executor")
Line 54: 
Line 55:     def __init__(self, workers=10, max_tasks=1000):
> I don't think we need defaults, since we don't know how the executor is goi
Good point, removed.
Line 56:         self._workers_count = workers
Line 57:         self._tasks = Queue.Queue(max_tasks)
Line 58:         self._workers = []
Line 59:         self._lock = threading.Lock()


Line 54: 
Line 55:     def __init__(self, workers=10, max_tasks=1000):
Line 56:         self._workers_count = workers
Line 57:         self._tasks = Queue.Queue(max_tasks)
Line 58:         self._workers = []
> Lets keep workers in a set(), make it easier to remove them when needed.
Done
Line 59:         self._lock = threading.Lock()
Line 60:         self._running = False
Line 61:         self._discards = 0  # for debug purposes only
Line 62: 


Line 65:             if self._running:
Line 66:                 raise ExecutorAlreadyStarted
Line 67:             for _ in range(self._workers_count):
Line 68:                 self._add_worker()
Line 69:             self._running = True
> Lets put self._running before starting the threads, which may use this valu
Done
Line 70: 
Line 71:     def stop(self, wait=True):
Line 72:         for worker in self._workers:
Line 73:             self._tasks.put(_STOP)


Line 69:             self._running = True
Line 70: 
Line 71:     def stop(self, wait=True):
Line 72:         for worker in self._workers:
Line 73:             self._tasks.put(_STOP)
> This may block if tasks queue is full, so this is not a good way to stop th
Agreed. Will work on this.
Line 74:         with self._lock:
Line 75:             self._running = False
Line 76:             if wait:
Line 77:                 for worker in self._workers:


Line 72:         for worker in self._workers:
Line 73:             self._tasks.put(_STOP)
Line 74:         with self._lock:
Line 75:             self._running = False
Line 76:             if wait:
> This will deadlock because you hold the lock when exiting threads may try t
Will introduce a per-thread 'discarded' flag and will wait only for 
not-discarded threads as you suggested in a past comment.

Will also rearchitect the stop() to overcome this dangerous
behaviour.
Line 77:                 for worker in self._workers:
Line 78:                     self._log.debug('worker %s stopped', worker.name)
Line 79:                     worker.join()
Line 80:                     del worker


Line 94:         with self._lock:
Line 95:             if not self._running:
Line 96:                 raise ExecutorNotRunning
Line 97:         task = BackgroundTask(task, self)
Line 98:         self._tasks.put(task)
> This may raise as you released the lock above, so another thread may have j
Will move inside the lock and switch to put_nowait
Line 99:         return task
Line 100: 
Line 101:     @utils.traceback()
Line 102:     def _run(self):


Line 108:             try:
Line 109:                 task.execute()
Line 110:             except Exception:
Line 111:                 self._log.exception('Unhandled exception for task')
Line 112:             if task.discarded:
> This can be simplified by raising in task.execute() if task was discarded.
Good point, will change.
Line 113:                 self._log.debug('Worker stopping after task was 
discarded')
Line 114:                 break
Line 115:         self._log.info('worker stopped')
Line 116:         if task.discarded:


Line 122:             self._workers.remove(worker)
Line 123:             self._log.debug('worker %s left the executor', 
worker.name)
Line 124: 
Line 125:     def _add_worker(self):
Line 126:         worker = threading.Thread(target=self._run)
> Name this thread - to make it easy to understand the logs, I wouuld keep a 
Done
Line 127:         worker.daemon = True
Line 128:         worker.start()
Line 129:         self._workers.append(worker)
Line 130:         self._log.debug('worker %s joined the executor', worker.name)


Line 134:             self._discards += 1
Line 135:             self._add_worker()
Line 136: 
Line 137: 
Line 138: class BackgroundTask(object):
> More specific name would be ExecutorTask.
Done
Line 139:     """
Line 140:     Represents a task submitted to the Executor.
Line 141:     The client can
Line 142:     - verify if a task is in progress or not


Line 159:     def in_progress(self):
Line 160:         """
Line 161:         returns True if this task is in progress, False otherwise.
Line 162:         """
Line 163:         return self._thread is not None
> This may be a task waiting on the queue because all workers are busy, or a 
No immediate need, will remove
Line 164: 
Line 165:     def discard(self):
Line 166:         """
Line 167:         This is to be called once the task is detected as stuck.


Line 184:         finally:
Line 185:             self._thread = None
Line 186: 
Line 187: 
Line 188: _STOP = BackgroundTask(None, None)
> This should be object() - nobody get this back and expect the BackgroundTas
Done


-- 
To view, visit http://gerrit.ovirt.org/29191
To unsubscribe, visit http://gerrit.ovirt.org/settings

Gerrit-MessageType: comment
Gerrit-Change-Id: Ic06da1ba57868dc2c7db67a1868ad10087a1cff2
Gerrit-PatchSet: 11
Gerrit-Project: vdsm
Gerrit-Branch: master
Gerrit-Owner: Francesco Romani <[email protected]>
Gerrit-Reviewer: Antoni Segura Puimedon <[email protected]>
Gerrit-Reviewer: Dan Kenigsberg <[email protected]>
Gerrit-Reviewer: Dima Kuznetsov <[email protected]>
Gerrit-Reviewer: Federico Simoncelli <[email protected]>
Gerrit-Reviewer: Francesco Romani <[email protected]>
Gerrit-Reviewer: Michal Skrivanek <[email protected]>
Gerrit-Reviewer: Nir Soffer <[email protected]>
Gerrit-Reviewer: Saggi Mizrahi <[email protected]>
Gerrit-Reviewer: Vinzenz Feenstra <[email protected]>
Gerrit-Reviewer: Yaniv Bronhaim <[email protected]>
Gerrit-Reviewer: [email protected]
Gerrit-Reviewer: oVirt Jenkins CI Server
Gerrit-HasComments: Yes
_______________________________________________
vdsm-patches mailing list
[email protected]
https://lists.fedorahosted.org/mailman/listinfo/vdsm-patches

Reply via email to