Nir Soffer has posted comments on this change.

Change subject: executor: introduce the executor library
......................................................................


Patch Set 11:

(21 comments)

I think that this is the right direction. To discard tasks, the code which 
submit the should schedule a task.discard() call using the scheduler. The 
calling code should be able to tell the right timeout for this.

There are various issues in the implementation. See the inline comments.

To shorten the review cycle, I posted a modified version that show how all the 
issue can be solved:
http://pastebin.com/xevn4b28

http://gerrit.ovirt.org/#/c/29191/11/lib/vdsm/executor.py
File lib/vdsm/executor.py:

Line 28: import logging
Line 29: import Queue
Line 30: import threading
Line 31: 
Line 32: import utils
We can use now relative imports:

    from . import utils
Line 33: 
Line 34: 
Line 35: class ExecutorNotRunning(Exception):
Line 36:     """


Line 31: 
Line 32: import utils
Line 33: 
Line 34: 
Line 35: class ExecutorNotRunning(Exception):
"executor.ExecutorNotRunning" repeat the "Executor" word for no reason. We have 
namespaces in Python, so there is no need to repeat stuff.
Line 36:     """
Line 37:     Executor not yet started or shutting down.
Line 38:     """
Line 39: 


Line 37:     Executor not yet started or shutting down.
Line 38:     """
Line 39: 
Line 40: 
Line 41: class ExecutorAlreadyStarted(Exception):
"executor.ExecutorAllreadyStarted" repeat the "Executor" word for no reason.
Line 42:     """
Line 43:     Executor started multiple times.
Line 44:     """
Line 45: 


Line 51:     """
Line 52: 
Line 53:     _log = logging.getLogger("Executor")
Line 54: 
Line 55:     def __init__(self, workers=10, max_tasks=1000):
I don't think we need defaults, since we don't know how the executor is going 
to be used. The caller must specify this.
Line 56:         self._workers_count = workers
Line 57:         self._tasks = Queue.Queue(max_tasks)
Line 58:         self._workers = []
Line 59:         self._lock = threading.Lock()


Line 54: 
Line 55:     def __init__(self, workers=10, max_tasks=1000):
Line 56:         self._workers_count = workers
Line 57:         self._tasks = Queue.Queue(max_tasks)
Line 58:         self._workers = []
Lets keep workers in a set(), make it easier to remove them when needed.
Line 59:         self._lock = threading.Lock()
Line 60:         self._running = False
Line 61:         self._discards = 0  # for debug purposes only
Line 62: 


Line 57:         self._tasks = Queue.Queue(max_tasks)
Line 58:         self._workers = []
Line 59:         self._lock = threading.Lock()
Line 60:         self._running = False
Line 61:         self._discards = 0  # for debug purposes only
I don't think we need this, logging each discard operation is enough.
Line 62: 
Line 63:     def start(self):
Line 64:         with self._lock:
Line 65:             if self._running:


Line 65:             if self._running:
Line 66:                 raise ExecutorAlreadyStarted
Line 67:             for _ in range(self._workers_count):
Line 68:                 self._add_worker()
Line 69:             self._running = True
Lets put self._running before starting the threads, which may use this value.
Line 70: 
Line 71:     def stop(self, wait=True):
Line 72:         for worker in self._workers:
Line 73:             self._tasks.put(_STOP)


Line 69:             self._running = True
Line 70: 
Line 71:     def stop(self, wait=True):
Line 72:         for worker in self._workers:
Line 73:             self._tasks.put(_STOP)
This may block if tasks queue is full, so this is not a good way to stop the 
threads.

This will all threads *after* pending tasks are executed, which can cause 
threads to stuck, and this method will never return when using wait=True.

I think that we like to stop all waiting threads *now*, and drop pending tasks.

To do this, we need to clear the queue, and signal all threads to exit. This is 
not possible when using the builtin Queue. Queue contains many features which 
we don't need (task_done, join), but not what we need (clear).
Line 74:         with self._lock:
Line 75:             self._running = False
Line 76:             if wait:
Line 77:                 for worker in self._workers:


Line 72:         for worker in self._workers:
Line 73:             self._tasks.put(_STOP)
Line 74:         with self._lock:
Line 75:             self._running = False
Line 76:             if wait:
This will deadlock because you hold the lock when exiting threads may try to 
acquire it.

Also this wait is too dangerous, since a thread which is running a task may get 
stuck, and thread which is already running a task may (practically) never exit.

We like to wait only for threads which are not executing any task.
Line 77:                 for worker in self._workers:
Line 78:                     self._log.debug('worker %s stopped', worker.name)
Line 79:                     worker.join()
Line 80:                     del worker


Line 94:         with self._lock:
Line 95:             if not self._running:
Line 96:                 raise ExecutorNotRunning
Line 97:         task = BackgroundTask(task, self)
Line 98:         self._tasks.put(task)
This may raise as you released the lock above, so another thread may have just 
stopped the executor.

Also, adding task may block if the queue is full. This breaks other code that 
assume that dispatch is async - we never want to do this.

We should use here put_nowait() to ensure that this never blocks. The caller of 
dispatch should handle the Queue.Full exception that this raises.
Line 99:         return task
Line 100: 
Line 101:     @utils.traceback()
Line 102:     def _run(self):


Line 108:             try:
Line 109:                 task.execute()
Line 110:             except Exception:
Line 111:                 self._log.exception('Unhandled exception for task')
Line 112:             if task.discarded:
This can be simplified by raising in task.execute() if task was discarded.
Line 113:                 self._log.debug('Worker stopping after task was 
discarded')
Line 114:                 break
Line 115:         self._log.info('worker stopped')
Line 116:         if task.discarded:


Line 112:             if task.discarded:
Line 113:                 self._log.debug('Worker stopping after task was 
discarded')
Line 114:                 break
Line 115:         self._log.info('worker stopped')
Line 116:         if task.discarded:
We don't need this check, stop does not remove workers from the workers list.
Line 117:             self._remove_worker(threading.current_thread())
Line 118:         # else no action needed: stop(wait=True) will take care.
Line 119: 
Line 120:     def _remove_worker(self, worker):


Line 122:             self._workers.remove(worker)
Line 123:             self._log.debug('worker %s left the executor', 
worker.name)
Line 124: 
Line 125:     def _add_worker(self):
Line 126:         worker = threading.Thread(target=self._run)
Name this thread - to make it easy to understand the logs, I wouuld keep a 
counter and call the threads "Executor-N"
Line 127:         worker.daemon = True
Line 128:         worker.start()
Line 129:         self._workers.append(worker)
Line 130:         self._log.debug('worker %s joined the executor', worker.name)


Line 131: 
Line 132:     def _replace_worker(self):
Line 133:         with self._lock:
Line 134:             self._discards += 1
Line 135:             self._add_worker()
This should not add a worker if executor was stopped.
Line 136: 
Line 137: 
Line 138: class BackgroundTask(object):
Line 139:     """


Line 134:             self._discards += 1
Line 135:             self._add_worker()
Line 136: 
Line 137: 
Line 138: class BackgroundTask(object):
More specific name would be ExecutorTask.
Line 139:     """
Line 140:     Represents a task submitted to the Executor.
Line 141:     The client can
Line 142:     - verify if a task is in progress or not


Line 147:     """
Line 148:     def __init__(self, task, executor):
Line 149:         self._task = task
Line 150:         self._executor = executor
Line 151:         self._thread = None
We don't need the actual thread, keeping a boolean is enough.
Line 152:         self._discarded = False
Line 153: 
Line 154:     @property
Line 155:     def discarded(self):


Line 159:     def in_progress(self):
Line 160:         """
Line 161:         returns True if this task is in progress, False otherwise.
Line 162:         """
Line 163:         return self._thread is not None
This may be a task waiting on the queue because all workers are busy, or a task 
that was already executed.

Why do we need this racy check?
Line 164: 
Line 165:     def discard(self):
Line 166:         """
Line 167:         This is to be called once the task is detected as stuck.


Line 172:         It is not possible to interrupt a stuck task, thus
Line 173:         any side effect will still take place.
Line 174:         """
Line 175:         self._discarded = True
Line 176:         self._executor._replace_worker()
This is racy - if a thread was just executing this task, discarding may be too 
late, so the thread will continue, and we will add another thread to the pool 
for no reason.

For example:

 1. thread 1 executing task
 2. thread 2 call discard
 3. thread 1 finish task
 4. thread 2 set self._discard and invoke _replace_worker

Also, calling an executor protected method is little ugly. Since we don't care 
about the executor object, but only about _replace_worker, we can accept  the 
method instead of the object.

Finally, discarding same task twice will add 2 new workers instead of one.
Line 177: 
Line 178:     # task interface
Line 179: 
Line 180:     def execute(self):


Line 176:         self._executor._replace_worker()
Line 177: 
Line 178:     # task interface
Line 179: 
Line 180:     def execute(self):
Raising if this task is already executing will be nice - this is impossible and 
should raise AssertionError.
Line 181:         self._thread = threading.current_thread()
Line 182:         try:
Line 183:             self._task()
Line 184:         finally:


Line 181:         self._thread = threading.current_thread()
Line 182:         try:
Line 183:             self._task()
Line 184:         finally:
Line 185:             self._thread = None
This must release the reference to the task, since this reference prevent 
garbage collection of the task until the  background task is collected.

Also keeping reference to the executor make it harder for Python to collect the 
executor (executor -> tasks -> task -> executor).

We can simplify the worker thread logic by raising here if task was discarded 
while executing.
Line 186: 
Line 187: 


Line 184:         finally:
Line 185:             self._thread = None
Line 186: 
Line 187: 
Line 188: _STOP = BackgroundTask(None, None)
This should be object() - nobody get this back and expect the BackgroundTask 
interface, and we want it to break if someone try to execute it.


-- 
To view, visit http://gerrit.ovirt.org/29191
To unsubscribe, visit http://gerrit.ovirt.org/settings

Gerrit-MessageType: comment
Gerrit-Change-Id: Ic06da1ba57868dc2c7db67a1868ad10087a1cff2
Gerrit-PatchSet: 11
Gerrit-Project: vdsm
Gerrit-Branch: master
Gerrit-Owner: Francesco Romani <[email protected]>
Gerrit-Reviewer: Antoni Segura Puimedon <[email protected]>
Gerrit-Reviewer: Dan Kenigsberg <[email protected]>
Gerrit-Reviewer: Dima Kuznetsov <[email protected]>
Gerrit-Reviewer: Federico Simoncelli <[email protected]>
Gerrit-Reviewer: Francesco Romani <[email protected]>
Gerrit-Reviewer: Michal Skrivanek <[email protected]>
Gerrit-Reviewer: Nir Soffer <[email protected]>
Gerrit-Reviewer: Saggi Mizrahi <[email protected]>
Gerrit-Reviewer: Vinzenz Feenstra <[email protected]>
Gerrit-Reviewer: Yaniv Bronhaim <[email protected]>
Gerrit-Reviewer: [email protected]
Gerrit-Reviewer: oVirt Jenkins CI Server
Gerrit-HasComments: Yes
_______________________________________________
vdsm-patches mailing list
[email protected]
https://lists.fedorahosted.org/mailman/listinfo/vdsm-patches

Reply via email to