Christopher Stawarz ha scritto:
This is the third draft of my proposed extensions for better supporting WSGI apps on asynchronous servers. The major changes since the last draft are as follows:


First of all, thanks for your effort.

* The title and abstract now accurately reflect the scope of the proposal.
  In addition, the extensions are now in the namespace "x-wsgiorg.fdevent"
  (instead of "x-wsgiorg.async").

* The proposal for an alternative, non-blocking input stream has been
dropped, since I don't see a way to add one that wouldn't break middleware.

Well, IMHO the "general" solution here is to use greenlets.

  Instead, the spec recommends that async servers pre-read the request body
  before invoking the app (either by default or as a configurable option).


This is the best solution most of the time (but not for all of the time), especially if the "server" can do some "pre-parsing" of multipart/form-data request body.

In fact I plan to write a custom function (in C for Nginx) that will "reduce", as an example:

   Content-Type: multipart/form-data; boundary=AaB03x

   --AaB03x
   Content-Disposition: form-data; name="submit-name"

   Larry
   --AaB03x
   Content-Disposition: form-data; name="files"; filename="file1.txt"
   Content-Type: text/plain

   ... contents of file1.txt ...
   --AaB03x--

to (not properly escaped):

Content-Type: application/x-www-form-urlencoded

submit-name=Larry&files.filename=file1.txt&files.ctype=text/plain&files.path=xxx


and the contents of file1.txt will be saved to a temporary file 'xxx'.



Once again, I'd appreciate your comments.



I have some comments:

1) Why not add a more generic poll like interface?

   Moreover IMHO storing a timeout variable in the environ to check if
   the previous call timedout, is not the best solution.

   In my implementation I return a function, but with generators in
   Python 2.5 this can be done in a better way.

2) In Nginx it is not possible to simply handle "plain" file
   descriptors, since these are wrapped in a connection structure.

   This is the reason why I had to add a connection_wrapper function in
   my WSGI module for Nginx.

3) If you read an example that implements a database connection pool:
http://hg.mperillo.ath.cx/nginx/mod_wsgi/file/tip/examples/nginx-postgres-async.py

   you can see that there is a problem.

   In fact the pool is not very flexible; the application can not handle
   more than POOL_SIZE concurrent requests.

   However it is possible to just have a new request to wait until a
   previous connection is free (or a timeout occurs).

   I have attached an example (it is not in the repository since there
   are some problems).

   The examples use a new extension:

     - ctx = environ['ngx.request_context']()
     - ctx.resume()

   ctx.resume() "asynchronously" resumes the given request
   (it will be resumed as soon as control returns to Nginx, when the
    application yields something).


   Note that the problem of resuming another request is easily solved
   with greenlets, without the need to new extensions
   (this is one of the reason why I like greenlets).


> [...]



Regards  Manlio Perillo
from collections import deque
import psycopg2 as db


# The table and the function are created by the setup script `postgres_setup.py`
query_select = "SELECT a, b, c, d, e FROM RandomTable LIMIT 10"
query_sleep = "SELECT * FROM sleep(1)"


# These constants are defined in the WSGI environment but their value
# is know
WSGI_POLLIN = 0x01
WSGI_POLLOUT = 0x04


# Size of the connection pool
POOL_SIZE = 20

# Free connections available
free_connections = deque()

# Connections waiting for a free slot
waiting_requests = deque()

# Number of concurrent connections
connections = 0

# State to be kept between requests
request_state = {}



def get_connection(environ):
    global connections

    print 'open', connections, len(free_connections), len(waiting_requests)

    if free_connections:
        print 'reuse'
        # reuse existing connection
        dbconn, c = free_connections.pop()
    elif connections < POOL_SIZE:
        print 'new'
        # create a new connection
        dbconn = db.connect(database='test')

        curs = dbconn.cursor()
        # XXX bad API, fileno should be a property of the connection object
        fd = curs.fileno()
        c = environ['ngx.connection_wrapper'](fd)

        connections = connections + 1
    else:
        print 'wait'
        # no free slots, this request will have to wait
        ctx = environ['ngx.request_context']()
        waiting_requests.append(ctx)

        return None, None

    # XXX check me
    environ['ngx.poll_register'](c, WSGI_POLLIN)

    return dbconn, c

def close_connection(environ, dbconn, c):
    print 'close', connections, len(free_connections), len(waiting_requests)

    environ['ngx.poll_unregister'](c)

    # resume a waiting request, if any
    if waiting_requests:
        print 'waiting'
        ctx = waiting_requests.pop()
        try:
            ctx.resume()
        except RuntimeError, e:
            # XXX check me, the request has been finalized
            # (in this case we need to increment the timeout)
            print 'Error:', e
            # push the connection in the free connections
            free_connections.append((dbconn, c))

            return

        # make the connection directly available to the waiting
        # request
        request_state[ctx] = (dbconn, c)
    else:
        print 'no waiting'
        # push the connection in the free connections
        free_connections.append((dbconn, c))


def application(environ, start_response):
    global connections

    headers = [
        ('Server', 'Test'),
        ('Content-Type', 'text/plain'),
        ('X-Powered-By', 'Python'),
        ]

    qs = environ.get('QUERY_STRING', 'sleep')
    poll = environ['ngx.poll']

    dbconn, c = get_connection(environ)
    if dbconn is None:
        # no free connection, wait until one became available or a
        # timeout expires
        poll(50000)
        yield ''

        ctx = environ['ngx.request_context']()

        dbconn, c = request_state.get(ctx, (None, None))
        if dbconn is None:
            start_response('500 ERROR', headers)
            print 'query wait timedout'
            yield 'query wait timedout'
            return

        # XXX check me
        environ['ngx.poll_register'](c, WSGI_POLLIN)

        ##del request_state[ctx]

    curs = dbconn.cursor()

    if qs == 'sleep':
        query = query_sleep
    else:
        query = query_select

    # XXX error handling
    curs.execute(query, async=True)
    while not curs.isready():
        state = poll(5000)
        yield ''

        c_, flags = state()
        if c_ is None:
            start_response('500 ERROR', headers)
            print 'query timedout'
            yield 'query timedout'

            # XXX we just throw away the connection
            dbconn.close()
            c.close()
            connections = connections - 1

            return

    r = curs.fetchall()
    close_connection(environ, dbconn, c)

    if qs == 'sleep':
        data = 'empty return set'
    else:
        data = '\n'.join([' | '.join(['%5d' % col for col in row]) for row in r])

    start_response('200 OK', headers)
    yield data
_______________________________________________
Web-SIG mailing list
Web-SIG@python.org
Web SIG: http://www.python.org/sigs/web-sig
Unsubscribe: 
http://mail.python.org/mailman/options/web-sig/archive%40mail-archive.com

Reply via email to