Nir Soffer has posted comments on this change.

Change subject: executor: introduce the executor library
......................................................................


Patch Set 10:

(27 comments)

Nice, can be simplified and be more correct.

http://gerrit.ovirt.org/#/c/29191/10/lib/vdsm/executor.py
File lib/vdsm/executor.py:

Line 36:     pass
Line 37: 
Line 38: 
Line 39: def _COLLECTOR(result):
Line 40:     pass
Can you move private stuff like this to the end of the file? It is easier to 
understand how to use this when you don't see the public parts first.
Line 41: 
Line 42: 
Line 43: class NoAvailableWorker(Exception):
Line 44:     """


Line 45:     No available worker to carry the task.
Line 46:     """
Line 47: 
Line 48: 
Line 49: class PoolNotRunning(Exception):
Lets go with your choice and name it ExecutorNotRunning
Line 50:     """
Line 51:     Pool not yet started or shutting down.
Line 52:     """
Line 53: 


Line 59:     """
Line 60: 
Line 61:     _log = logging.getLogger("Executor")
Line 62: 
Line 63:     def __init__(self, num_workers=10):
I think that "workers" or "size" is much more readable then the cryptic 
num_workers. If you want to be more verbose, maybe "worker_count".
Line 64:         self._num_workers = num_workers
Line 65:         self._tasks = Queue.Queue()
Line 66:         self._workers = []
Line 67:         self._resize_lock = threading.Lock()


Line 61:     _log = logging.getLogger("Executor")
Line 62: 
Line 63:     def __init__(self, num_workers=10):
Line 64:         self._num_workers = num_workers
Line 65:         self._tasks = Queue.Queue()
We need to limit the size - you don't want to hold here 1000's of tasks.

When the tasks queue is full, you probably want to raise, so the caller can 
handle this without blocking.
Line 66:         self._workers = []
Line 67:         self._resize_lock = threading.Lock()
Line 68:         self._running = False
Line 69:         self._evictions = 0  # for debug purposes only


Line 63:     def __init__(self, num_workers=10):
Line 64:         self._num_workers = num_workers
Line 65:         self._tasks = Queue.Queue()
Line 66:         self._workers = []
Line 67:         self._resize_lock = threading.Lock()
I would call this _lock, since you need to protect more then the workers list.
Line 68:         self._running = False
Line 69:         self._evictions = 0  # for debug purposes only
Line 70: 
Line 71:     def start(self):


Line 68:         self._running = False
Line 69:         self._evictions = 0  # for debug purposes only
Line 70: 
Line 71:     def start(self):
Line 72:         with self._resize_lock:
You should raise if executor is started multiple times.
Line 73:             for _ in range(self._num_workers):
Line 74:                 self._add_worker()
Line 75:         self._running = True
Line 76: 


Line 74:                 self._add_worker()
Line 75:         self._running = True
Line 76: 
Line 77:     def stop(self):
Line 78:         self._running = False
Must be inside the lock.
Line 79:         for worker in self._workers:
Line 80:             self._tasks.put((_STOP, (), {}))
Line 81:         with self._resize_lock:
Line 82:             for worker in self._workers:


Line 79:         for worker in self._workers:
Line 80:             self._tasks.put((_STOP, (), {}))
Line 81:         with self._resize_lock:
Line 82:             for worker in self._workers:
Line 83:                 self._log.debug('terminating worker: %s', worker.name)
This is not correct, you are not terminating the worker here. Maybe: ("Worker 
%s stopped", worker.name)
Line 84:                 worker.join()
Line 85:                 del worker
Line 86: 
Line 87:     @property


Line 80:             self._tasks.put((_STOP, (), {}))
Line 81:         with self._resize_lock:
Line 82:             for worker in self._workers:
Line 83:                 self._log.debug('terminating worker: %s', worker.name)
Line 84:                 worker.join()
This can take lot of time - should be done only if the caller specified 
wait=True
Line 85:                 del worker
Line 86: 
Line 87:     @property
Line 88:     def size(self):


Line 84:                 worker.join()
Line 85:                 del worker
Line 86: 
Line 87:     @property
Line 88:     def size(self):
If you go with "size" here, then the __init__ argument should be also "size".

But do we need this at all? checking the size is racy, better not report it.
Line 89:         """
Line 90:         returns the actual size of the worker pool.
Line 91:         """
Line 92:         with self._resize_lock:


Line 91:         """
Line 92:         with self._resize_lock:
Line 93:             return len(self._workers)
Line 94: 
Line 95:     def send(self, task, *args, **kwargs):
send? I don't understand this api.

I think we need one interface such as "dispatch" or "queue". send is very 
strange interface for thread pool.

I also prefer not to support args and kwargs, just accept a callable. If the 
user want to keep additional state, it should send a callable keeping this 
state.

We can always add *args or even **kwargs if needed, lets start with simpler 
version.
Line 96:         """
Line 97:         send a new task to the pool.
Line 98: 
Line 99:         The task may be any callable. extra arguments are


Line 106:         See doc(BackgroundTask.discard)
Line 107:         """
Line 108:         return self.submit(_COLLECTOR, task, *args, **kwargs)
Line 109: 
Line 110:     def submit(self, collector, task, *args, **kwargs):
submit? I don't understand this api.
Line 111:         """
Line 112:         submit a task to the pool, with a collector function.
Line 113: 
Line 114:         The collector function must be a callable which


Line 122:         The collector will be called only if the task is not
Line 123:         discarded.
Line 124:         See doc(BackgroundTask.discard)
Line 125:         """
Line 126:         if not self._running:
Should be protected by the lock
Line 127:             raise PoolNotRunning
Line 128:         if not self._workers:
Line 129:             raise NoAvailableWorker
Line 130:         task = BackgroundTask(task, self, collector)


Line 124:         See doc(BackgroundTask.discard)
Line 125:         """
Line 126:         if not self._running:
Line 127:             raise PoolNotRunning
Line 128:         if not self._workers:
Must be protected by the lock
Line 129:             raise NoAvailableWorker
Line 130:         task = BackgroundTask(task, self, collector)
Line 131:         self._tasks.put((task, args, kwargs))
Line 132:         return task


Line 125:         """
Line 126:         if not self._running:
Line 127:             raise PoolNotRunning
Line 128:         if not self._workers:
Line 129:             raise NoAvailableWorker
We don't need to check for no workers - the executor should keep this list 
full, and we don't care if in some point in time it is empty, since the 
executor will fill it back soon.
Line 130:         task = BackgroundTask(task, self, collector)
Line 131:         self._tasks.put((task, args, kwargs))
Line 132:         return task
Line 133: 


Line 128:         if not self._workers:
Line 129:             raise NoAvailableWorker
Line 130:         task = BackgroundTask(task, self, collector)
Line 131:         self._tasks.put((task, args, kwargs))
Line 132:         return task
Keeping tuples of (task, args, kwargs) means that any object references from 
task (e.g. self for a method), args or kwargs, will never be collected until 
the task is executed. Since wrap the task with the BackgroundTask, I think you 
should put the args and kwargs inside this object, and put it in the queue.

Then running or discarding can be localized into the BackgroundTask object.
Line 133: 
Line 134:     def _run(self):
Line 135:         self._log.info('worker started')
Line 136:         while True:


Line 129:             raise NoAvailableWorker
Line 130:         task = BackgroundTask(task, self, collector)
Line 131:         self._tasks.put((task, args, kwargs))
Line 132:         return task
Line 133: 
Must be protected with @traceback
Line 134:     def _run(self):
Line 135:         self._log.info('worker started')
Line 136:         while True:
Line 137:             task, args, kwargs = self._tasks.get()


Line 134:     def _run(self):
Line 135:         self._log.info('worker started')
Line 136:         while True:
Line 137:             task, args, kwargs = self._tasks.get()
Line 138:             if task is _STOP:
This should be:

    task = self._tasks.get()
Line 139:                 break
Line 140:             try:
Line 141:                 with running(task):
Line 142:                     task(*args, **kwargs)


Line 138:             if task is _STOP:
Line 139:                 break
Line 140:             try:
Line 141:                 with running(task):
Line 142:                     task(*args, **kwargs)
Should be:

    task.execute()
Line 143:             except Exception:
Line 144:                 self._log.exception('unhandled exception for task')
Line 145:             if task.discarded:
Line 146:                 self._log.debug('worker aborting')


Line 140:             try:
Line 141:                 with running(task):
Line 142:                     task(*args, **kwargs)
Line 143:             except Exception:
Line 144:                 self._log.exception('unhandled exception for task')
unhandled -> Unhandled
Line 145:             if task.discarded:
Line 146:                 self._log.debug('worker aborting')
Line 147:                 break
Line 148:         self._log.info('worker stopped')


Line 141:                 with running(task):
Line 142:                     task(*args, **kwargs)
Line 143:             except Exception:
Line 144:                 self._log.exception('unhandled exception for task')
Line 145:             if task.discarded:
Nice and simple.
Line 146:                 self._log.debug('worker aborting')
Line 147:                 break
Line 148:         self._log.info('worker stopped')
Line 149: 


Line 142:                     task(*args, **kwargs)
Line 143:             except Exception:
Line 144:                 self._log.exception('unhandled exception for task')
Line 145:             if task.discarded:
Line 146:                 self._log.debug('worker aborting')
Lets be more clear: "Worker stopping after task was discarded"
Line 147:                 break
Line 148:         self._log.info('worker stopped')
Line 149: 
Line 150:     def remove_worker(self, worker):


Line 144:                 self._log.exception('unhandled exception for task')
Line 145:             if task.discarded:
Line 146:                 self._log.debug('worker aborting')
Line 147:                 break
Line 148:         self._log.info('worker stopped')
I think we should handle removal of workers here:

    with self._lock:
        me = threading.current_thread()
        self._workers.remove(me)

This can never fail since this is the only place removing from the workers list.

If we do this, size does not report the number of available threads, but you 
can do something like this:

   class BackgroundTask:

       def discard(self):
           self._thread.discarded = True
           self._executor._add_worker()

And if you like to query the state of the executor:

       def available_workers(self):
           with self._lock:
               return len(t for t in self.workers if not t.discarded)

       def discarded_workers(self):
           with self._lock:
               return len(t for t in self.workers if t.discarded)
           

Since number of workers is small, this is fast enough.

This also allows to wait for all threads, including stuck threads, but we 
probably don't want this

Anyway, having all threads, both active and stuck/discarded in same list will 
help to understand how this behaves.
Line 149: 
Line 150:     def remove_worker(self, worker):
Line 151:         "to be used from BackgroundTask only."
Line 152:         with self._resize_lock:


Line 146:                 self._log.debug('worker aborting')
Line 147:                 break
Line 148:         self._log.info('worker stopped')
Line 149: 
Line 150:     def remove_worker(self, worker):
This is little ugly - people will use this eventually. Just make this 
_protected and use it from the background task.
Line 151:         "to be used from BackgroundTask only."
Line 152:         with self._resize_lock:
Line 153:             if not self._running:
Line 154:                 return  # no point in messing up things if shutting 
down


Line 166:             worker = threading.Thread(target=self._run)
Line 167:             worker.daemon = True
Line 168:             worker.start()
Line 169:         except Exception:
Line 170:             self._log.exception('cannot create worker')
Let not handle this here, and let the caller handle this error.
Line 171:         else:
Line 172:             self._workers.append(worker)
Line 173:             self._log.debug('worker %s joined the pool', worker.name)
Line 174: 


Line 186:     def __init__(self, task, executor, collector):
Line 187:         self._task = task
Line 188:         self._executor = executor
Line 189:         self._collector = collector
Line 190:         self._valid = True
I don't understand why we need a collector. The task should simply run, if it 
should call some other function with some value, this should be implemented by 
the task function.

I don't think we should provide such services like the current storage thread 
pool.

This will allow removal of the collector, valid and one of the executor methods 
to dispatch tasks.
Line 191:         self._thread = None
Line 192: 
Line 193:     @property
Line 194:     def in_progress(self):


Line 217:         self._valid = False
Line 218:         if self.in_progress:
Line 219:             self._executor.remove_worker(self._thread)
Line 220: 
Line 221:     # runnable interface: internal usage only
This is nice, but internal stuff should be _protected.

But we don't need this fancy running interface, because we can do:

    def execute(self):
        self._thread = threading.current_thread()
        try:
            self._task(self._args, self._kwargs)
        finally:
            self._thread = None
Line 222: 
Line 223:     def start(self):
Line 224:         self._thread = threading.current_thread()
Line 225: 


-- 
To view, visit http://gerrit.ovirt.org/29191
To unsubscribe, visit http://gerrit.ovirt.org/settings

Gerrit-MessageType: comment
Gerrit-Change-Id: Ic06da1ba57868dc2c7db67a1868ad10087a1cff2
Gerrit-PatchSet: 10
Gerrit-Project: vdsm
Gerrit-Branch: master
Gerrit-Owner: Francesco Romani <[email protected]>
Gerrit-Reviewer: Antoni Segura Puimedon <[email protected]>
Gerrit-Reviewer: Dan Kenigsberg <[email protected]>
Gerrit-Reviewer: Dima Kuznetsov <[email protected]>
Gerrit-Reviewer: Federico Simoncelli <[email protected]>
Gerrit-Reviewer: Francesco Romani <[email protected]>
Gerrit-Reviewer: Michal Skrivanek <[email protected]>
Gerrit-Reviewer: Nir Soffer <[email protected]>
Gerrit-Reviewer: Saggi Mizrahi <[email protected]>
Gerrit-Reviewer: Vinzenz Feenstra <[email protected]>
Gerrit-Reviewer: Yaniv Bronhaim <[email protected]>
Gerrit-Reviewer: [email protected]
Gerrit-Reviewer: oVirt Jenkins CI Server
Gerrit-HasComments: Yes
_______________________________________________
vdsm-patches mailing list
[email protected]
https://lists.fedorahosted.org/mailman/listinfo/vdsm-patches

Reply via email to