At 08:41 PM 2/13/2007 +0000, [EMAIL PROTECTED] wrote: > and the microthreading features being discussed here are a trivial hack > somewhere in its mainloop machinery, not an entirely new subsystem that > it should be implemented in terms of.
It doesn't even require hacking the mainloop; it can be done entirely in userspace. Here's some code I wrote a few months ago but never got around to using it. (Or testing it, for that matter, so it may not work, or even compile!) It requires Python 2.5 and the "simplegeneric" package from the Cheeseshop, and it allows you to "spawn" generators to create independently-running pseudothreads. These pseudothreads can yield to Deferreds, "returning" the value or failure thereof. (Failures cause the error to be raises such that it appears to happen at the yield point.) There are also a few other yield targets like "Return" (to return a value to a calling co-routine), "Pause" (to delay resumption), and "with_timeout" (to wrap some other yieldable in a timeout that raises TimeoutError). Enjoy! import types from twisted.internet import reactor from twisted.internet.defer import Deferred, TimeoutError from twisted.python import failure from simplegeneric import generic from functools import partial __all__ = [ # user APIs: 'spawn', 'Pause', 'Return', 'with_timeout', # extension APIs: 'schedule', 'resume', 'throw', 'current_task', 'yield_to', 'yieldable', ] def spawn(geniter, delay=0): """Create a new task and schedule it for execution Usage:: spawn(someGeneratorFunc(args)) The given generator-iterator will be run by the Twisted reactor after `delay` seconds have passed (0 by default). The return value of this function is a "task" object that can be passed to the ``schedule()``, ``throw()``, and ``yield_to()`` APIs. """ task = [geniter] schedule(task, delay) return task def schedule(task, delay=0, value=None): """Schedule `task` to resume after `delay` seconds (w/optional `value`)""" if task: return reactor.callLater(delay, resume, task, value) # XXX warn if non-None value sent to empty task? def resume(task, value=None): """Resume `task` immediately, returning `value` for the current yield""" if task: _invoke(task, partial(task[-1].send, value)) # XXX warn if non-None value sent to empty task? def throw(task, exc): """Raise `exc` tuple in `task` and immediately resume its execution""" if not task: # Propagate exception out of a failed task raise exc[0], exc[1], exc[2] _invoke(task, partial(task[-1].throw, *exc)) def _invoke(task, method): """Invoke method() in context of `task`, yielding to the return value""" try: value = method() except StopIteration: task.pop() # it's an exit with no return value resume(task) # so send None up the stack except: task.pop() # Propagate exception up the stack throw(task, sys.exc_info()) else: # Handle a yielded value yield_to(value, task) @generic def yield_to(value, task): """Handle a yielded value To register special handlers, use [EMAIL PROTECTED]()`` or or [EMAIL PROTECTED]()``. (See the ``simplegeneric`` docs for details.) """ raise TypeError( "Unrecognized yield value (maybe missing Return()?): %r" % (value,) ) @yield_to.when_type(defer.Deferred): def _yield_to_deferred(value, task): """Return a deferred value immediately or pause until fired""" def _resume(value): if isinstance(value, failure.Failure): throw(task, (type(value), value, None)) # XXX extract error? else: resume(task, value) return value # don't alter the deferred's value # This will either fire immediately, or delay until the appropriate time value.addCallbacks(_resume, _resume) @yield_to.when_type(types.GeneratorType) def _yield_to_subgenerator(value, task): """Call a sub-generator, putting it on the task's call stack""" task.append(value) schedule(task) def yieldable(f): """Declare that a function may be yielded to Usage:: @yieldable def do_something(task): # code that will be invoked upon (yield do_something) (yield do_something) The function's return value will be ignored, and errors in it are NOT caught! It must instead use the ``resume()``, ``schedule()``, ``throw()``, or ``yield_to()`` APIs to communicate with the yielding task (which is provided as the sole argument.) """ f.__yieldable__ = True return f @yield_to.when_type(types.FunctionType) def _yield_to_function(value, task): """Invoke task-management function""" if getattr(value, '__yieldable__', None): return value(task) # function is marked as yieldable else: raise TypeError( "Function is not marked yieldable (maybe missing Return()?): %r" % (value,) ) @yieldable def current_task(task): """Yield this function (don't call it) to obtain the current task Usage: ``task = (yield current_task)`` The yielding coroutine is immediately resumed; no task switching will occur. """ resume(task, task) class Pause(object): """Object that can be yielded to temporarily pause execution Usage:: yield Pause # allow other tasks to run, then resume ASAP yield Pause(5) # don't run for at least five seconds """ seconds = 0 def __init__(self, seconds=0): self.seconds = seconds def __repr__(self): return 'Pause(%s)' % self.seconds @yield_to.when_type(Pause) @yield_to.when_object(Pause) def _yield_to_pause(value, task): """Allow other tasks to run, by moving this task to the end of the queue""" schedule(task, value.seconds) class Return(object): """Return a value to your caller Usage:: yield Return(42) # returns 42 to calling coroutine yield Return # returns None to calling coroutine """ value = None def __init__(self, value): self.value = value def __repr__(self): return 'Return(%s)' % self.value @yield_to.when_type(Return) @yield_to.when_object(Return) def _yield_to_return(value, task): """Return a value to the caller""" task.pop().close() # ensure any ``finally`` clauses are run resume(task, value.value) def with_timeout(seconds, value): """Raise TimeoutError if `value` doesn't resume before `seconds` elapse Example:: result = yield with_timeout(30, something(whatever)) This is equivalent to ``result = yield something(whatever)``, except that if the current task isn't resumed in 30 seconds or less, a ``defer.TimeoutError`` will be raised in the blocked task. (Note: if the The `value` passed to this routine may be any yieldable value, although it makes no sense to yield a value that will just be returned to a parent coroutine. Note that the ``something(whatever)`` subtask may trap the ``TimeoutError`` and misinterpret its significance, so generally speaking you should use this only for relatively low-level operations, and expose a timeout parameter to your callers, to allow them to simply specify the timeout rather than using this wrapper. In fact, this is what most Twisted APIs do, providing a ``timeout`` keyword argument that you should use instead of this wrapper, as they will then raise ``TimeoutError`` automatically after the specified time expires. So, if you're calling such an API, don't use this wrapper. """ # raise a timeout error in this task after seconds delayedCall = reactor.callLater( seconds, throw, (yield current_task), (TimeoutError, TimeoutError(), None) ) try: yield Return((yield value)) # call the subtask and return its result finally: # Ensure timeout call doesn't fire after we've exited if delayedCall.active(): delayedCall.cancel() _______________________________________________ Python-Dev mailing list Python-Dev@python.org http://mail.python.org/mailman/listinfo/python-dev Unsubscribe: http://mail.python.org/mailman/options/python-dev/archive%40mail-archive.com