"Matt Culbreth" <[EMAIL PROTECTED]> writes:

> I'm playing out with a few things now and I wanted to see if anyone
> else has used SQLAlchemy in an asynchronous manner?  For example, you
> could create a service which responded to asynchronous requests for
> data, and could be used by a web client, desktop client, other types
> of clients, etc.

Yep, I'm currently building a Twisted application (client/server)
where the server is using SQLAlchemy.

> The sAsync project at http://foss.eepatents.com/sAsync/ seems ideally
> suited for this but I haven't seen any comments about it here.

For what it's worth, I opted not to go with sAsync (which looks very
nice) because I was looking for something non-GPL.  In the end, I
rolled something small of my own, since my requirements were simple.

I created a simple database class that contained a background thread
for execution.  The background thread owns the engine and session
objects and serializes any use of them through a run() method and
internal queue.  E.g., very simple single-thread pool.  The
application object instances themselves are accessed normally in the
main Twisted thread.

When a Twisted routine wants to use SQLAlchemy operations it just
calls run() or encapsulates them in a small function to operate on.  I
originally toyed with wrapping session or engine objects to more
dynamically or automatically adapt things, but wasn't comfortable with
the result.  So in the end it's pretty explicit in the code, but then
that's one of the attractive qualities to me of both Twisted and
SQLAlchemy - not too much "magic" going on implicitly.

The idea was that the overall objects involved are common at the
Twisted thread level so you're application is already set for a single
threaded (asynchronous) environment.  This just defers the actual
database operations (via SA) into the background thread, but doesn't
attempt to simulate multiple threads of object state (such as having a
distinct session per thread in a multi-threaded application might do).

In case you're interested, it's tiny, so here's my current database
class (still needs some final error checking polish, but I'm using it
in development):

          - - - - - - - - - - - - - - - - - - - - - - - - -

import threading
import Queue

import sqlalchemy as sa
import sqlalchemy.orm as orm

from twisted.internet import reactor
from twisted.internet.defer import Deferred
from twisted.python.failure import Failure


#
# --------------------------------------------------------------------------
#

class Database(object):
    """Wrapper for a SQLAlchemy engine supporting execution of database
    related activities in a background thread with the result communicated
    back through a deferred.

    If a metadata object is provided to the constructor, it is connected
    to the underlying engine within the background thread.  (And is,
    presumably a DynamicMetaData object)"""

    def __init__(self, db_url, metadata=None, *engine_args, **engine_kwargs):
        self.queue = Queue.Queue()
        self.started = threading.Event()
        
        self.dbthread = threading.Thread(target=self._dbThread,
                                         args=(db_url, metadata,
                                               engine_args, engine_kwargs))
        self.dbthread.setDaemon(True)
        self.dbthread.start()
        self.started.wait()


    #
    # Background execution thread
    #
    
    def _dbThread(self, db_url, metadata, eargs, ekwargs):
        self.engine = sa.create_engine(db_url, *eargs, **ekwargs)
        self.session = orm.create_session(self.engine)
        if metadata:
            metadata.connect(self.engine)
        self.started.set()

        while 1:
            op = self.queue.get()
            if op is None:
                return
            else:
                func, args, kwargs, d = op
                try:
                    result = d.callback, func(*args, **kwargs)
                except:
                    result = d.errback, Failure()
                reactor.callFromThread(*result)

    #
    # Primary thread entry points
    #

    def run(self, func, *args, **kwargs):
        result = Deferred()
        self.queue.put((func, args, kwargs, result))
        return result

    def shutdown(self):
        self.queue.put(None)
        self.dbthread.join(2)

          - - - - - - - - - - - - - - - - - - - - - - - - -

Typical initialization (startup before initial reactor.run()):

    db = database.Database('sqlite:///%s' % db_filename,
                           metadata=schema.metadata)

And here's a simple retrieval obj objects through the ORM within a
server routine (a PB published object):

    def remote_jobs(self):
        return self.db.run(lambda:self.db.session.query(model.Job).select())

Here's an operation with a few more steps encapsulated into a function (it
saves a copy of a deleted object into an archive table):

    def remote_deleteJob(self, job_id):
        
        def dbop(job_id=job_id):
            s = self.db.session
            job = s.query(model.Job).get(job_id)
            s.delete(job)
            # Create duplicate for archiving (Skipping _* attributes
            # ignores all the SA state stuff)
            archive = model.Job()
            for attr, value in job.__dict__.items():
                if attr[0] != '_': setattr(archive, attr, value)
            s.save(archive, entity_name='deleted')
            s.flush()

        d = self.db.run(dbop)
        return d

And if you've got Python 2.5, here's an inlineCallbacks method
(slighted edited down):

    @inlineCallbacks
    def remote_uploadRequest(self, job_id, job_file):
        job = yield self.db.run(lambda:
                                self.db.session(model.Job).get(job_id))
        if not job:
            raise Exception('Invalid job')
            
        # Mark that it hasn't been uploaded yet
        job_file.uploaded = None

        def saveFile(job_file=job_file):
            self.db.session.save(job_file)
            self.db.session.flush()

        # Insert it into the database
        yield self.db.run(saveFile)

        upload_key = self.fileio.setupUpload(job_file.name)

        returnValue({'port': self.port, 'key': upload_key})


-- David


--~--~---------~--~----~------------~-------~--~----~
You received this message because you are subscribed to the Google Groups 
"sqlalchemy" group.
To post to this group, send email to sqlalchemy@googlegroups.com
To unsubscribe from this group, send email to [EMAIL PROTECTED]
For more options, visit this group at 
http://groups.google.com/group/sqlalchemy?hl=en
-~----------~----~----~----~------~----~------~--~---

Reply via email to