Whoops, forgot to attach the files. Lets see if this works :)
--~--~---------~--~----~------------~-------~--~----~
You received this message because you are subscribed to the Google Groups
"TurboGears" group.
To post to this group, send email to [email protected]
To unsubscribe from this group, send email to [EMAIL PROTECTED]
For more options, visit this group at http://groups.google.com/group/turbogears
-~----------~----~----~----~------~----~------~--~---
import cherrypy
from cherrypy._cpwsgi import wsgiApp
import wsgiserver
from wsgiserver import HybrideWSGIServer, fix_thread_local
# Monkeypatch wsgiserver.handle_request to handle cherrypy's threadlocal
objects.
wsgiserver.handle_request =
fix_thread_local(cherrypy.serving)(wsgiserver.handle_request)
class CherrypyWSGIServer(HybrideWSGIServer):
""" A wrapper around WSGIServer to handle configuration."""
def __init__(self):
conf = cherrypy.config.get
bind_addr = (conf("server.socket._host") or '',
conf("server.socket_port"))
num_threads = conf("server.thread_pool")
server_name = conf("server.socket_host")
backlog = conf("server.socket_queue_size") or 5
granny = conf("server.event_granularity") or 0.1
HybrideWSGIServer.__init__(self, bind_addr, wsgiApp, num_threads,
server_name,
request_queue_size=backlog,
event_granularity = granny)
#!C:\Python24\python.exe
import pkg_resources
pkg_resources.require("TurboGears")
import turbogears
import cherrypy
cherrypy.lowercase_api = True
from os.path import *
import sys
# first look on the command line for a desired config file,
# if it's not on the command line, then
# look for setup.py in this directory. If it's not there, this script is
# probably installed
if len(sys.argv) > 1:
turbogears.update_config(configfile=sys.argv[1],
modulename="scribble.config")
elif exists(join(dirname(__file__), "setup.py")):
turbogears.update_config(configfile="dev.cfg",
modulename="scribble.config")
else:
turbogears.update_config(configfile="prod.cfg",
modulename="scribble.config")
from scribble.controllers import Root
cherrypy.root = Root()
from cpwsgiserver import CherrypyWSGIServer
cherrypy.server.start(initOnly=True, serverClass=CherrypyWSGIServer)
s = CherrypyWSGIServer()
try:
s.start()
except:
s.stop()
raise
""" A WSGI server that supports simple `continuations`
This is an (experimental) WSGI server that builds on the threaded cherrypy WSGI
server. It adds an option to 'suspend' requests that are not ready to produce
content. A WSGI app can do so by yielding an empty string (''). The server will
use this indicator to tranfer the request from the pre-emetive threadpool into a
'microthread' pool. This frees the pre-emptive thread to work on active
requests.
As a result the server can handle a resonable number of pending request and
still
be responsive to active requests.
Example usage:
--------------
The server will keep polling the app after it has moved the request out of the
pre-emptive threadpool. One can use the `while some_nonblocking_predicate:
yield '' `
construct to keep the app in a `suspended` state.
>>> import time
>>> def timeout_after(delay)
>>> end = time.clock() + delay
>>> return lambda: (time.clock() < end)
>>>
>>> def SlowWSGIApp(environ, start_response):
>>> response_headers = [('Content-type','text/plain')]
>>> start_response("200 OK", response_headers)
>>> ready = timeout_after(5)
>>> while not ready(): yield ''
>>> yield 'Hello world!'
>>>
>>> s = HybridWSGIServer(('',8080), SlowWSGIApp, numthreads=1)
>>> s.start()
Note on usage in combination with a frameworks:
-----------------------------------------------
Most web frameworks use threadlocal objects to store requests specific
information in the framework namespace. The fix_thread_local decorator
makes sure that thread local objects are transfered between threads. E.g,
>>> import cherrypy
>>> handle_request = fix_thread_local(cherrypy.serving)(handle_request)
There might also be other threadsafty issues specific to your framework.
Note on peformance:
-------------------
The goal of this server is to quickly build a `comet` enabled server that
integrates
with TurboGears. You could use this server to test higher level abstraction to
do messages
routing and such.
This server would however, would be a bad solution for public/ potential high
traffic
services. The pending requests are continuously polled for state changes, thus
greatly
limiting the maximum number of pending connections it can sustain. E.g, my
development
notebook can do some 30k yields in 0.1 sec. Given the fact that the app will
need to
check at least one predicate per connection this would limit the performance at
15k
connections. Apps also need cpu time to do something usefull, thus actual
performance
will be much lower. A simple chat application implemented using the TurboGears
framework
has been tested with 1024 connections resulting in 65% cpu utilization.
The WSGI 1.0 spec has limited support for async applications. The spec however,
does allow
a custom server extenstion-api which could be used to to hook event triggers
into the
server eventloop. This would remove the wastefull polling of the whole app
stack. Furthermore
when deployed on a potenially slow internet it would be wise to switch from
threaded to
non-blocking or even asynchronous io. So, as one can see, there is plenty of
room to improve
performance. Futhermore it would be wise to look into the Twisted framework.
Twisted has alot
of advanced async IO code which can be leveraged to build high peformance
servers.
Copyright (c) 2006 Joost Moesker
Published under the MIT License
"""
from copy import copy
from cherrypy import _cpwsgiserver
from cherrypy._cpwsgiserver import *
_NO_THREAD, _THREAD = "_not_thread", "_thread"
_SHUTDOWNREQUEST = None
# generic helpers:
def watchdog( func, callback, threshold=0):
""" Monitor execution time and make a callback if it exceeded the
threshold."""
import time
def wrapper(*arg, **kw):
start = time.clock()
results = func(*arg, **kw)
et = time.clock() - start
if et > threshold:
callback({'callable':func, 'execution_time' :et})
return results
return wrapper
def fix_thread_local(thread_local):
"""Task decorator that ensures that a thread local moves between threads."""
def decorator(task):
def wrapper(*arg, **kw):
for i in task(*arg, **kw):
serving = thread_local.__dict__.copy()
thread_local.__dict__.clear()
yield i
thread_local.__dict__.update(serving)
return wrapper
return decorator
def handle_request(request):
""" Drives the request after it has been accepted
This `coroutine` handles the request after it has been accepted by the
server.
It controls in how a code section will be executed. Yielding _THREAD will
make sure that the next code segment will be executed in a pre-emptive
thread. Yielding _NO_THREAD yields control back to the `microthread`
scheduler.
"""
#NOTE: parse.request calls rfile.readline which can block, so requests must
be started up in a thread
#yield _THREAD
request.parse_request()
if not request.ready:
request.terminate()
raise StopIteration
response = request.wsgi_app( request.environ, request.start_response)
for data in response:
if data == '':
yield _NO_THREAD
else:
yield _THREAD
try:
request.write(data)
except (KeyboardInterrupt, SystemExit):
request.terminate() # lets play nice and close the connection
raise
except Exception, e:
if len(e.args) and e.args[0] in socket_errors_to_ignore: break
traceback.print_exc()
break
if hasattr(response, "close"):
response.close()
request.terminate()
class WorkerThread(threading.Thread):
"""A worker thread that dispatches a task depending on its 'state'. """
def __init__(self, server):
self.ready = False
self.server = server
self.task_queue = server.task_queue
self.task_list = server.task_list
threading.Thread.__init__(self)
def run(self):
self.ready = True
task_list, task_queue = self.task_list, self.task_queue
while True:
task = self.task_queue.get()
if task is _SHUTDOWNREQUEST:
return
try:
state = task.next()
# Dispatch the next step:
if state is _THREAD: task_queue.put(task) # XXX: seems
wastefull why not continue in the same thread...
if state is _NO_THREAD: task_list.append(task)
except StopIteration:
if task in task_list: task_list.remove(task)
except (KeyboardInterrupt, SystemExit), exc:
self.server.interrupt = exc
#XXX: monkey patch the orginial WorkerThread class in _cpwsgisever
_cpwsgiserver.WorkerThread = WorkerThread
class HybrideWSGIServer(CherryPyWSGIServer):
""" WSGI server that can handle longlived HTTP connections.
Important parameters:
* ``bind_addr`` - A (hostname, port) tuple on which the server should
listen.
* ``wsgi_app`` - The WSGI Application e.g, cherrypy._cpwsgi.wsgiApp
* ``numthreads`` - Numer of worker threads in the threadpool (default=10)
* ``server_name`` - Name of the server (default=None)
* ``max`` - Size of the thread pool queue size (default=-1, unlimited)
* ``request_queue`` - Maximum backlog on the listing socket (default=5)
* ``timeout`` - Default socket timeout (default=10)
* ``event_granularity`` - Time between application state checks
(default=0.1)
"""
ready = False
interrupt = None
ProtocolHandlerClass = HTTPRequest
def __init__(self, bind_addr, wsgi_app, numthreads=10, server_name=None,
max=-1, request_queue_size=5, timeout=10,
event_granularity=0.1):
"""Be careful w/ max."""
CherryPyWSGIServer.__init__(self, bind_addr, wsgi_app, numthreads=10,
server_name=None, max=-1,
request_queue_size=5,
timeout=10 )
self.task_list = []
self.task_queue = self.requests
self.event_granularity = event_granularity
def start(self):
"""Run the server forever."""
# Create a thread that runs the dispatch loop:
t = threading.Thread(target=self.dispatch_loop,
args=[self.event_granularity])
t.start()
CherryPyWSGIServer.start(self)
def tick(self):
"""Accepts incomming connections and dispatch a task to the
task_queue"""
try:
s, addr = self.socket.accept() #seems to be blocking...
if hasattr(s, 'settimeout'):
s.settimeout(self.timeout)
request = self.ProtocolHandlerClass(s, addr, self)
task = handle_request( request)
self.task_queue.put(task) # start out in a thread
except socket.timeout:
# The only reason for the timeout in start() is so we can
# notice keyboard interrupts on Win32, which don't interrupt
# accept() by default
return
except (KeyboardInterrupt, SystemExit), exc:
self.interrupt = exc
def dispatch_loop(self, granularity=0.1):
""" Polling loop to handle pending requests."""
task_list, task_queue = self.task_list, self.task_queue
try:
while True:
start = time.clock()
task_list_copy = copy(self.task_list) # workerthreads can
modify the tasklist concurently, so we make shallow copy
for task in task_list_copy:
if task is _SHUTDOWNREQUEST:
return
try:
state = task.next()
# Dispatch the next step:
if state is _THREAD:
task_list.remove(task)
task_queue.put(task)
#elif _NO_THREAD: pass
except StopIteration:
if task in task_list: task_list.remove(task)
delay = time.clock() - start
time.sleep( max( (granularity - delay), 0))
except (KeyboardInterrupt, SystemExit):
self.interrupt = exc
def stop(self):
"""Gracefully shutdown a server that is serving forever."""
self.task_list.append(_SHUTDOWNREQUEST)
CherryPyWSGIServer.stop(self)