Francesco Romani has posted comments on this change.

Change subject: executor: introduce the executor library
......................................................................


Patch Set 10:

(25 comments)

http://gerrit.ovirt.org/#/c/29191/10/lib/vdsm/executor.py
File lib/vdsm/executor.py:

Line 36:     pass
Line 37: 
Line 38: 
Line 39: def _COLLECTOR(result):
Line 40:     pass
> Can you move private stuff like this to the end of the file? It is easier t
Sure, will have the public interface first.
Line 41: 
Line 42: 
Line 43: class NoAvailableWorker(Exception):
Line 44:     """


Line 45:     No available worker to carry the task.
Line 46:     """
Line 47: 
Line 48: 
Line 49: class PoolNotRunning(Exception):
> Lets go with your choice and name it ExecutorNotRunning
Done
Line 50:     """
Line 51:     Pool not yet started or shutting down.
Line 52:     """
Line 53: 


Line 59:     """
Line 60: 
Line 61:     _log = logging.getLogger("Executor")
Line 62: 
Line 63:     def __init__(self, num_workers=10):
> I think that "workers" or "size" is much more readable then the cryptic num
Picked 'workers'
Line 64:         self._num_workers = num_workers
Line 65:         self._tasks = Queue.Queue()
Line 66:         self._workers = []
Line 67:         self._resize_lock = threading.Lock()


Line 61:     _log = logging.getLogger("Executor")
Line 62: 
Line 63:     def __init__(self, num_workers=10):
Line 64:         self._num_workers = num_workers
Line 65:         self._tasks = Queue.Queue()
> We need to limit the size - you don't want to hold here 1000's of tasks.
Done
Line 66:         self._workers = []
Line 67:         self._resize_lock = threading.Lock()
Line 68:         self._running = False
Line 69:         self._evictions = 0  # for debug purposes only


Line 63:     def __init__(self, num_workers=10):
Line 64:         self._num_workers = num_workers
Line 65:         self._tasks = Queue.Queue()
Line 66:         self._workers = []
Line 67:         self._resize_lock = threading.Lock()
> I would call this _lock, since you need to protect more then the workers li
Done
Line 68:         self._running = False
Line 69:         self._evictions = 0  # for debug purposes only
Line 70: 
Line 71:     def start(self):


Line 68:         self._running = False
Line 69:         self._evictions = 0  # for debug purposes only
Line 70: 
Line 71:     def start(self):
Line 72:         with self._resize_lock:
> You should raise if executor is started multiple times.
Done
Line 73:             for _ in range(self._num_workers):
Line 74:                 self._add_worker()
Line 75:         self._running = True
Line 76: 


Line 74:                 self._add_worker()
Line 75:         self._running = True
Line 76: 
Line 77:     def stop(self):
Line 78:         self._running = False
> Must be inside the lock.
Done
Line 79:         for worker in self._workers:
Line 80:             self._tasks.put((_STOP, (), {}))
Line 81:         with self._resize_lock:
Line 82:             for worker in self._workers:


Line 79:         for worker in self._workers:
Line 80:             self._tasks.put((_STOP, (), {}))
Line 81:         with self._resize_lock:
Line 82:             for worker in self._workers:
Line 83:                 self._log.debug('terminating worker: %s', worker.name)
> This is not correct, you are not terminating the worker here. Maybe: ("Work
Picked 'Worker %s stopped'
Line 84:                 worker.join()
Line 85:                 del worker
Line 86: 
Line 87:     @property


Line 80:             self._tasks.put((_STOP, (), {}))
Line 81:         with self._resize_lock:
Line 82:             for worker in self._workers:
Line 83:                 self._log.debug('terminating worker: %s', worker.name)
Line 84:                 worker.join()
> This can take lot of time - should be done only if the caller specified wai
Done
Line 85:                 del worker
Line 86: 
Line 87:     @property
Line 88:     def size(self):


Line 84:                 worker.join()
Line 85:                 del worker
Line 86: 
Line 87:     @property
Line 88:     def size(self):
> If you go with "size" here, then the __init__ argument should be also "size
No immediaten need, then removed.
Line 89:         """
Line 90:         returns the actual size of the worker pool.
Line 91:         """
Line 92:         with self._resize_lock:


Line 91:         """
Line 92:         with self._resize_lock:
Line 93:             return len(self._workers)
Line 94: 
Line 95:     def send(self, task, *args, **kwargs):
> send? I don't understand this api.
Ok for the naming. Will use 'dispatch'.
Agreed for *args and **kwargs: no immediate need, let's start without them.
Line 96:         """
Line 97:         send a new task to the pool.
Line 98: 
Line 99:         The task may be any callable. extra arguments are


Line 106:         See doc(BackgroundTask.discard)
Line 107:         """
Line 108:         return self.submit(_COLLECTOR, task, *args, **kwargs)
Line 109: 
Line 110:     def submit(self, collector, task, *args, **kwargs):
> submit? I don't understand this api.
Well, in retrospect yes, those are ugly names. I needed somehow similar names 
to dispatch a task with and without a collector callable. But let's just get 
rid of the collector until (if any) there is objective need for it.
Line 111:         """
Line 112:         submit a task to the pool, with a collector function.
Line 113: 
Line 114:         The collector function must be a callable which


Line 122:         The collector will be called only if the task is not
Line 123:         discarded.
Line 124:         See doc(BackgroundTask.discard)
Line 125:         """
Line 126:         if not self._running:
> Should be protected by the lock
Done
Line 127:             raise PoolNotRunning
Line 128:         if not self._workers:
Line 129:             raise NoAvailableWorker
Line 130:         task = BackgroundTask(task, self, collector)


Line 124:         See doc(BackgroundTask.discard)
Line 125:         """
Line 126:         if not self._running:
Line 127:             raise PoolNotRunning
Line 128:         if not self._workers:
> Must be protected by the lock
Done
Line 129:             raise NoAvailableWorker
Line 130:         task = BackgroundTask(task, self, collector)
Line 131:         self._tasks.put((task, args, kwargs))
Line 132:         return task


Line 128:         if not self._workers:
Line 129:             raise NoAvailableWorker
Line 130:         task = BackgroundTask(task, self, collector)
Line 131:         self._tasks.put((task, args, kwargs))
Line 132:         return task
> Keeping tuples of (task, args, kwargs) means that any object references fro
Agreed to get rid of explicit args and kwargs and pack the state in the object
Line 133: 
Line 134:     def _run(self):
Line 135:         self._log.info('worker started')
Line 136:         while True:


Line 129:             raise NoAvailableWorker
Line 130:         task = BackgroundTask(task, self, collector)
Line 131:         self._tasks.put((task, args, kwargs))
Line 132:         return task
Line 133: 
> Must be protected with @traceback
Done
Line 134:     def _run(self):
Line 135:         self._log.info('worker started')
Line 136:         while True:
Line 137:             task, args, kwargs = self._tasks.get()


Line 134:     def _run(self):
Line 135:         self._log.info('worker started')
Line 136:         while True:
Line 137:             task, args, kwargs = self._tasks.get()
Line 138:             if task is _STOP:
> This should be:
Done
Line 139:                 break
Line 140:             try:
Line 141:                 with running(task):
Line 142:                     task(*args, **kwargs)


Line 138:             if task is _STOP:
Line 139:                 break
Line 140:             try:
Line 141:                 with running(task):
Line 142:                     task(*args, **kwargs)
> Should be:
Done
Line 143:             except Exception:
Line 144:                 self._log.exception('unhandled exception for task')
Line 145:             if task.discarded:
Line 146:                 self._log.debug('worker aborting')


Line 140:             try:
Line 141:                 with running(task):
Line 142:                     task(*args, **kwargs)
Line 143:             except Exception:
Line 144:                 self._log.exception('unhandled exception for task')
> unhandled -> Unhandled
Done
Line 145:             if task.discarded:
Line 146:                 self._log.debug('worker aborting')
Line 147:                 break
Line 148:         self._log.info('worker stopped')


Line 142:                     task(*args, **kwargs)
Line 143:             except Exception:
Line 144:                 self._log.exception('unhandled exception for task')
Line 145:             if task.discarded:
Line 146:                 self._log.debug('worker aborting')
> Lets be more clear: "Worker stopping after task was discarded"
Done
Line 147:                 break
Line 148:         self._log.info('worker stopped')
Line 149: 
Line 150:     def remove_worker(self, worker):


Line 144:                 self._log.exception('unhandled exception for task')
Line 145:             if task.discarded:
Line 146:                 self._log.debug('worker aborting')
Line 147:                 break
Line 148:         self._log.info('worker stopped')
> I think we should handle removal of workers here:
This will lead to better code, so I will do it.
Line 149: 
Line 150:     def remove_worker(self, worker):
Line 151:         "to be used from BackgroundTask only."
Line 152:         with self._resize_lock:


Line 146:                 self._log.debug('worker aborting')
Line 147:                 break
Line 148:         self._log.info('worker stopped')
Line 149: 
Line 150:     def remove_worker(self, worker):
> This is little ugly - people will use this eventually. Just make this _prot
Agreed and done
Line 151:         "to be used from BackgroundTask only."
Line 152:         with self._resize_lock:
Line 153:             if not self._running:
Line 154:                 return  # no point in messing up things if shutting 
down


Line 166:             worker = threading.Thread(target=self._run)
Line 167:             worker.daemon = True
Line 168:             worker.start()
Line 169:         except Exception:
Line 170:             self._log.exception('cannot create worker')
> Let not handle this here, and let the caller handle this error.
Done
Line 171:         else:
Line 172:             self._workers.append(worker)
Line 173:             self._log.debug('worker %s joined the pool', worker.name)
Line 174: 


Line 186:     def __init__(self, task, executor, collector):
Line 187:         self._task = task
Line 188:         self._executor = executor
Line 189:         self._collector = collector
Line 190:         self._valid = True
> I don't understand why we need a collector. The task should simply run, if 
No immediate or obvious need for the collector, just a maybe-nice-to-have 
feature. Thus removed and simplified the code.
Line 191:         self._thread = None
Line 192: 
Line 193:     @property
Line 194:     def in_progress(self):


Line 217:         self._valid = False
Line 218:         if self.in_progress:
Line 219:             self._executor.remove_worker(self._thread)
Line 220: 
Line 221:     # runnable interface: internal usage only
> This is nice, but internal stuff should be _protected.
Agreed and removed.
Line 222: 
Line 223:     def start(self):
Line 224:         self._thread = threading.current_thread()
Line 225: 


-- 
To view, visit http://gerrit.ovirt.org/29191
To unsubscribe, visit http://gerrit.ovirt.org/settings

Gerrit-MessageType: comment
Gerrit-Change-Id: Ic06da1ba57868dc2c7db67a1868ad10087a1cff2
Gerrit-PatchSet: 10
Gerrit-Project: vdsm
Gerrit-Branch: master
Gerrit-Owner: Francesco Romani <[email protected]>
Gerrit-Reviewer: Antoni Segura Puimedon <[email protected]>
Gerrit-Reviewer: Dan Kenigsberg <[email protected]>
Gerrit-Reviewer: Dima Kuznetsov <[email protected]>
Gerrit-Reviewer: Federico Simoncelli <[email protected]>
Gerrit-Reviewer: Francesco Romani <[email protected]>
Gerrit-Reviewer: Michal Skrivanek <[email protected]>
Gerrit-Reviewer: Nir Soffer <[email protected]>
Gerrit-Reviewer: Saggi Mizrahi <[email protected]>
Gerrit-Reviewer: Vinzenz Feenstra <[email protected]>
Gerrit-Reviewer: Yaniv Bronhaim <[email protected]>
Gerrit-Reviewer: [email protected]
Gerrit-Reviewer: oVirt Jenkins CI Server
Gerrit-HasComments: Yes
_______________________________________________
vdsm-patches mailing list
[email protected]
https://lists.fedorahosted.org/mailman/listinfo/vdsm-patches

Reply via email to