Modified: trunk/Tools/Scripts/libraries/webkitcorepy/webkitcorepy/task_pool.py (276996 => 276997)
--- trunk/Tools/Scripts/libraries/webkitcorepy/webkitcorepy/task_pool.py 2021-05-04 23:58:22 UTC (rev 276996)
+++ trunk/Tools/Scripts/libraries/webkitcorepy/webkitcorepy/task_pool.py 2021-05-05 00:00:14 UTC (rev 276997)
@@ -30,17 +30,17 @@
from webkitcorepy import OutputCapture, Timeout, log
-class Message(object):
+class _Message(object):
def __init__(self, who=None):
- self.who = who or Process.name
+ self.who = who or _Process.name
def __call__(self, caller):
NotImplemented()
-class Task(Message):
+class _Task(_Message):
def __init__(self, function, id, *args, **kwargs):
- super(Task, self).__init__()
+ super(_Task, self).__init__()
self.function = function
self.id = id
@@ -51,9 +51,9 @@
return self.function(*self.args, **self.kwargs)
-class Result(Message):
+class _Result(_Message):
def __init__(self, value, id):
- super(Result, self).__init__()
+ super(_Result, self).__init__()
self.value = value
self.id = id
@@ -63,9 +63,9 @@
return self.value
-class Log(Message):
+class _Log(_Message):
def __init__(self, record):
- super(Log, self).__init__()
+ super(_Log, self).__init__()
self.record = record
def __call__(self, caller):
@@ -72,12 +72,12 @@
logging.getLogger(self.record.name).log(self.record.levelno, '{} {}'.format(self.who, self.record.getMessage()))
-class Print(Message):
+class _Print(_Message):
stdout = 1
stderr = 2
def __init__(self, lines, stream=stdout):
- super(Print, self).__init__()
+ super(_Print, self).__init__()
self.lines = lines
self.stream = stream
@@ -90,32 +90,32 @@
stream.write('{}\n'.format(line))
-class State(Message):
+class _State(_Message):
STARTING, STOPPING = 1, 0
STATES = [STARTING, STOPPING]
def __init__(self, state):
- super(State, self).__init__()
+ super(_State, self).__init__()
self.state = state
def __call__(self, caller):
log.info('{} {}'.format(
self.who, {
- State.STARTING: 'starting',
- State.STOPPING: 'stopping',
+ self.STARTING: 'starting',
+ self.STOPPING: 'stopping',
}.get(self.state, self.state),
))
if caller:
caller._started += {
- State.STARTING: 1,
- State.STOPPING: -1,
+ self.STARTING: 1,
+ self.STOPPING: -1,
}.get(self.state, 0)
return self.state
-class ChildException(Message):
+class _ChildException(_Message):
def __init__(self, exc_info=None):
- super(ChildException, self).__init__()
+ super(_ChildException, self).__init__()
self.exc_info = exc_info or sys.exc_info()
def __call__(self, caller):
@@ -128,7 +128,7 @@
reraise(*self.exc_info)
-class BiDirectionalQueue(object):
+class _BiDirectionalQueue(object):
def __init__(self, outgoing=None, incoming=None):
self.outgoing = outgoing or multiprocessing.Queue()
self.incoming = incoming or multiprocessing.Queue()
@@ -147,17 +147,18 @@
return self.incoming.get()
-class Process(object):
+class _Process(object):
name = None
working = False
+ queue = None
class LogHandler(logging.Handler):
def __init__(self, queue, **kwargs):
self._queue = queue
- super(Process.LogHandler, self).__init__(**kwargs)
+ super(_Process.LogHandler, self).__init__(**kwargs)
def emit(self, record):
- self._queue.send(Log(record))
+ self._queue.send(_Log(record))
class Stream(io.IOBase):
def __init__(self, handle, queue):
@@ -169,7 +170,7 @@
def flush(self):
if self.cache is not None:
- self._queue.send(Print(lines=[self.cache], stream=self.handle))
+ self._queue.send(_Print(lines=[self.cache], stream=self.handle))
self.cache = None
def writelines(self, lines):
@@ -185,7 +186,7 @@
elif c not in ['\r', '\0']:
self.cache = c if self.cache is None else (self.cache + c)
if to_be_printed:
- self._queue.send(Print(lines=to_be_printed, stream=self.handle))
+ self._queue.send(_Print(lines=to_be_printed, stream=self.handle))
return len(data)
@property
@@ -252,9 +253,10 @@
logger.addHandler(cls.LogHandler(queue))
logger.setLevel(loglevel)
- queue.send(State(State.STARTING))
+ cls.queue = queue
+ queue.send(_State(_State.STARTING))
- with OutputCapture.ReplaceSysStream('stderr', cls.Stream(Print.stderr, queue)), OutputCapture.ReplaceSysStream('stdout', cls.Stream(Print.stdout, queue)):
+ with OutputCapture.ReplaceSysStream('stderr', cls.Stream(_Print.stderr, queue)), OutputCapture.ReplaceSysStream('stdout', cls.Stream(_Print.stdout, queue)):
try:
pickling_support.install()
if setup:
@@ -264,11 +266,11 @@
task = queue.receive()
if not task:
break
- queue.send(Result(value=task(None), id=task.id))
+ queue.send(_Result(value=task(None), id=task.id))
except BaseException:
typ, exception, traceback = sys.exc_info()
- queue.send(ChildException(exc_info=(
+ queue.send(_ChildException(exc_info=(
typ, typ('{} (from {})'.format(str(exception), name)), traceback,
)))
@@ -277,11 +279,22 @@
teardown(*teardownargs, **teardownkwargs)
sys.stdout.flush()
sys.stderr.flush()
- queue.send(State(State.STOPPING))
+ queue.send(_State(_State.STOPPING))
+ cls.queue = None
class TaskPool(object):
+ Message = _Message
+ Task = _Task
+ Result = _Result
+ Log = _Log
+ Print = _Print
+ State = _State
+ ChildException = _ChildException
+ BiDirectionalQueue = _BiDirectionalQueue
+ Process = _Process
+
class Exception(RuntimeError):
pass
@@ -294,20 +307,20 @@
import tblib
name = name or 'worker'
- if name == Process.name:
+ if name == self.Process.name:
raise ValueError("Parent process is already named {}".format(name))
if workers < 1:
raise ValueError('TaskPool requires positive number of workers')
- self.queue = BiDirectionalQueue()
+ self.queue = self.BiDirectionalQueue()
self.workers = [multiprocessing.Process(
- target=Process.main,
+ target=self.Process.main,
args=(
'{}/{}'.format(name, count), logging.getLogger().getEffectiveLevel(),
setup, setupargs, setupkwargs,
- BiDirectionalQueue(outgoing=self.queue.incoming, incoming=self.queue.outgoing),
+ self.BiDirectionalQueue(outgoing=self.queue.incoming, incoming=self.queue.outgoing),
teardown, teardownargs, teardownkwargs,
),
) for count in range(workers)]
@@ -330,7 +343,7 @@
callback = kwargs.pop('callback', None)
if callback:
self.callbacks[self._id_count] = callback
- self.queue.send(Task(function, self._id_count, *args, **kwargs))
+ self.queue.send(self.Task(function, self._id_count, *args, **kwargs))
self._id_count += 1
# For every block of tasks passed to our workers, we need consume messages so we don't get deadlocked