Charles-François Natali added the comment:
I'm attaching a proof of concept code for a ScheduledExecutor
interface, and a ScheduledThreadPoolExecutor implementation
(unfortunately I can't upload it as a mercurial diff for now).
Here's what the API looks like:
"""
from concurrent.futures import ScheduledThreadPoolExecutor
import time
def say(text):
print("{}: {}".format(time.ctime(), text))
with ScheduledThreadPoolExecutor(5) as p:
p.schedule(1, say, 'hello 1')
f = p.schedule_fixed_rate(0, 2, say, 'hello 2')
p.schedule_fixed_delay(0, 3, say, 'hello 3')
time.sleep(6)
say("cancelling: %s" % f)
f.cancel()
time.sleep(10)
say("shutting down")
"""
schedule() is for one-shot, schedule_fixed_rate() for fixed rate
scheduling (i.e. there will be no drift due to the task execution
time), and schedule_fixed_delay() is for fixed delay (i.e. there will
always be a fixed amount of time between two invokations).
Random notes:
- the scheduling is handled by a new SchedQueue in the queue module:
sched would have been useful, but actually it can't be used here: it
stops as soon as the queue is empty, when it calls the wait function
it won't wake up if a new task is enqueued, etc. Also, I guess such a
queue could be useful in general.
- I had to create a DelayedFuture subclass, which is returned by
schedule_XXX methods. The main differences with raw Future are that it
has a scheduled time and period attributes, and supports
reinitialization (a future can only be run once). It can be cancelled,
and also supports result/exception retrieval.
- I don't know if a process-based counterpart
(ScheduledProcessPoolExecutor) is really useful. I didn't look at it
for now.
----------
Added file: http://bugs.python.org/file30199/scheduled.diff
Added file: http://bugs.python.org/file30200/test.py
_______________________________________
Python tracker <rep...@bugs.python.org>
<http://bugs.python.org/issue995907>
_______________________________________
diff -ur cpython.orig/Lib/concurrent/futures/_base.py
cpython-b3e1be1493a5/Lib/concurrent/futures/_base.py
--- cpython.orig/Lib/concurrent/futures/_base.py 2013-05-07
08:21:21.000000000 +0000
+++ cpython-b3e1be1493a5/Lib/concurrent/futures/_base.py 2013-05-10
16:35:16.000000000 +0000
@@ -6,7 +6,10 @@
import collections
import logging
import threading
-import time
+try:
+ from time import monotonic as time
+except ImportError:
+ from time import time
FIRST_COMPLETED = 'FIRST_COMPLETED'
FIRST_EXCEPTION = 'FIRST_EXCEPTION'
@@ -188,7 +191,7 @@
before the given timeout.
"""
if timeout is not None:
- end_time = timeout + time.time()
+ end_time = timeout + time()
with _AcquireFutures(fs):
finished = set(
@@ -204,7 +207,7 @@
if timeout is None:
wait_timeout = None
else:
- wait_timeout = end_time - time.time()
+ wait_timeout = end_time - time()
if wait_timeout < 0:
raise TimeoutError(
'%d (of %d) futures unfinished' % (
@@ -390,11 +393,11 @@
elif self._state == FINISHED:
return self.__get_result()
- self._condition.wait(timeout)
+ gotit = self._condition.wait(timeout)
if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
raise CancelledError()
- elif self._state == FINISHED:
+ elif gotit:
return self.__get_result()
else:
raise TimeoutError()
@@ -423,11 +426,11 @@
elif self._state == FINISHED:
return self._exception
- self._condition.wait(timeout)
+ gotit = self._condition.wait(timeout)
if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
raise CancelledError()
- elif self._state == FINISHED:
+ elif gotit:
return self._exception
else:
raise TimeoutError()
@@ -499,6 +502,39 @@
self._condition.notify_all()
self._invoke_callbacks()
+
+class DelayedFuture(Future):
+ """A future whose execution can be delayed, and periodic."""
+
+ def __init__(self, sched_time, period=0, delay=0):
+ super().__init__()
+ self._sched_time = sched_time
+ if period > 0:
+ # step > 0 => fixed rate
+ self._step = period
+ elif delay > 0:
+ # step < 0 => fixed delay
+ self._step = -delay
+ else:
+ # step == 0 => one-shot
+ self._step = 0
+
+ def is_periodic(self):
+ return self._step != 0
+
+ def get_sched_time(self):
+ return self._sched_time
+
+ def rearm(self):
+ """Re-arm the future, and update its scheduled execution time."""
+ with self._condition:
+ if self._step > 0:
+ self._sched_time += self._step
+ else:
+ self._sched_time = time() - self._step
+ self._state = PENDING
+
+
class Executor(object):
"""This is an abstract base class for concrete asynchronous executors."""
@@ -532,7 +568,7 @@
Exception: If fn(*args) raises for any values.
"""
if timeout is not None:
- end_time = timeout + time.time()
+ end_time = timeout + time()
fs = [self.submit(fn, *args) for args in zip(*iterables)]
@@ -544,7 +580,7 @@
if timeout is None:
yield future.result()
else:
- yield future.result(end_time - time.time())
+ yield future.result(end_time - time())
finally:
for future in fs:
future.cancel()
@@ -569,3 +605,15 @@
def __exit__(self, exc_type, exc_val, exc_tb):
self.shutdown(wait=True)
return False
+
+
+class ScheduledExecutor(Executor):
+
+ def schedule(self, delay, fn, *args, **kwargs):
+ raise NotImplementedError()
+
+ def schedule_fixed_rate(self, init_delay, period, fn, *args, **kwargs):
+ raise NotImplementedError()
+
+ def schedule_fixed_delay(self, init_delay, delay, fn, *args, **kwargs):
+ raise NotImplementedError()
diff -ur cpython.orig/Lib/concurrent/futures/__init__.py
cpython-b3e1be1493a5/Lib/concurrent/futures/__init__.py
--- cpython.orig/Lib/concurrent/futures/__init__.py 2013-05-07
08:21:21.000000000 +0000
+++ cpython-b3e1be1493a5/Lib/concurrent/futures/__init__.py 2013-05-10
15:29:04.000000000 +0000
@@ -15,4 +15,4 @@
wait,
as_completed)
from concurrent.futures.process import ProcessPoolExecutor
-from concurrent.futures.thread import ThreadPoolExecutor
+from concurrent.futures.thread import ScheduledThreadPoolExecutor,
ThreadPoolExecutor
diff -ur cpython.orig/Lib/concurrent/futures/thread.py
cpython-b3e1be1493a5/Lib/concurrent/futures/thread.py
--- cpython.orig/Lib/concurrent/futures/thread.py 2013-05-07
08:21:21.000000000 +0000
+++ cpython-b3e1be1493a5/Lib/concurrent/futures/thread.py 2013-05-10
16:37:16.000000000 +0000
@@ -9,6 +9,10 @@
from concurrent.futures import _base
import queue
import threading
+try:
+ from time import monotonic as time
+except ImportError:
+ from time import time
import weakref
# Workers are created as daemon threads. This is done to allow the interpreter
@@ -57,6 +61,39 @@
else:
self.future.set_result(result)
+class _DelayedWorkItem(_WorkItem):
+
+ def __init__(self, queue, future, fn, args, kwargs):
+ super().__init__(future, fn, args, kwargs)
+ self.queue = queue
+
+ def run(self):
+ if not self.future.set_running_or_notify_cancel():
+ return
+
+ try:
+ result = self.fn(*self.args, **self.kwargs)
+ except BaseException as e:
+ self.future.set_exception(e)
+ else:
+ self.future.set_result(result)
+ if self.future.is_periodic():
+ # rearm the future - it also updates it scheduled time
+ self.future.rearm()
+ # and re-schedule ourselves - XXX don't reschedule if the pool
+ # is shut down
+ self.queue.put(self)
+
+
+class _DelayedWorkItemQueue(queue.SchedQueue):
+
+ def put(self, w, block=True, timeout=None):
+ if w is not None:
+ super().put_abs(w.future.get_sched_time(), w, block, timeout)
+ else:
+ super().put_abs(0, None, block, timeout)
+
+
def _worker(executor_reference, work_queue):
try:
while True:
@@ -130,3 +167,31 @@
for t in self._threads:
t.join()
shutdown.__doc__ = _base.Executor.shutdown.__doc__
+
+class ScheduledThreadPoolExecutor(ThreadPoolExecutor):
+ def __init__(self, max_workers):
+ super().__init__(max_workers)
+ self._work_queue = _DelayedWorkItemQueue()
+
+ def schedule(self, init_delay, fn, *args, **kwargs):
+ f = _base.DelayedFuture(time() + init_delay)
+ return self._schedule(f, fn, *args, **kwargs)
+
+ def schedule_fixed_rate(self, init_delay, period, fn, *args, **kwargs):
+ f = _base.DelayedFuture(time() + init_delay, period=period)
+ return self._schedule(f, fn, *args, **kwargs)
+
+ def schedule_fixed_delay(self, init_delay, delay, fn, *args, **kwargs):
+ f = _base.DelayedFuture(time() + init_delay, delay=delay)
+ return self._schedule(f, fn, *args, **kwargs)
+
+ def _schedule(self, f, fn, *args, **kwargs):
+ with self._shutdown_lock:
+ if self._shutdown:
+ raise RuntimeError('cannot schedule new futures after
shutdown')
+
+ w = _DelayedWorkItem(self._work_queue, f, fn, args, kwargs)
+
+ self._work_queue.put(w)
+ self._adjust_thread_count()
+ return f
--- cpython.orig/Lib/queue.py 2013-05-07 08:21:21.000000000 +0000
+++ cpython-b3e1be1493a5/Lib/queue.py 2013-05-10 16:32:50.000000000 +0000
@@ -247,3 +247,71 @@
def _get(self):
return self.queue.pop()
+
+
+class SchedQueue(PriorityQueue):
+ '''Variant of Queue that retrieves open entries as their deadline expire.
+ '''
+
+ def put(self, delay, item, block=True, timeout=None):
+ self.put_abs(time() + delay, item, block, timeout)
+
+ def put_abs(self, deadline, item, block=True, timeout=None):
+ super().put((deadline, item), block, timeout)
+
+ def _put(self, data):
+ deadline, item = data
+
+ do_notify = False
+ if self.queue:
+ earliest_deadline, _ = self.queue[0]
+ if deadline < earliest_deadline:
+ do_notify = True
+ else:
+ do_notify = True
+
+ heappush(self.queue, (deadline, item))
+
+ if do_notify:
+ self.not_empty.notify_all()
+
+ def get(self, block=True, timeout=None):
+ with self.not_empty:
+ if not self.queue and not block:
+ raise Empty
+ if timeout is not None:
+ if timeout < 0:
+ raise ValueError("'timeout' must be a positive number")
+ else:
+ timeout_deadline = time() + timeout
+
+ while True:
+ if not self.queue:
+ if timeout is None:
+ self.not_empty.wait()
+ else:
+ delay = timeout_deadline - time()
+ if delay > 0:
+ self.not_empty.wait(delay)
+ else:
+ raise Empty
+ else:
+ deadline, item = self.queue[0]
+ now = time()
+
+ if now >= deadline:
+ heappop(self.queue)
+ if self.queue:
+ self.not_empty.notify_all()
+ return item
+ elif not block:
+ raise Empty
+ elif timeout is None:
+ self.not_empty.wait(deadline - now)
+ else:
+ deadline = min(deadline, timeout_deadline)
+ delay = deadline - now
+ if delay > 0:
+ self.not_empty.wait(delay)
+ else:
+ raise Empty
from concurrent.futures import ScheduledThreadPoolExecutor
import time
def say(text):
print("{}: {}".format(time.ctime(), text))
with ScheduledThreadPoolExecutor(5) as p:
p.schedule(1, say, 'hello 1')
f = p.schedule_fixed_rate(0, 2, say, 'hello 2')
p.schedule_fixed_delay(0, 3, say, 'hello 3')
time.sleep(6)
say("cancelling: %s" % f)
f.cancel()
time.sleep(10)
say("shutting down")
_______________________________________________
Python-bugs-list mailing list
Unsubscribe:
http://mail.python.org/mailman/options/python-bugs-list/archive%40mail-archive.com