"Matt Culbreth" <[EMAIL PROTECTED]> writes: > I'm playing out with a few things now and I wanted to see if anyone > else has used SQLAlchemy in an asynchronous manner? For example, you > could create a service which responded to asynchronous requests for > data, and could be used by a web client, desktop client, other types > of clients, etc.
Yep, I'm currently building a Twisted application (client/server) where the server is using SQLAlchemy. > The sAsync project at http://foss.eepatents.com/sAsync/ seems ideally > suited for this but I haven't seen any comments about it here. For what it's worth, I opted not to go with sAsync (which looks very nice) because I was looking for something non-GPL. In the end, I rolled something small of my own, since my requirements were simple. I created a simple database class that contained a background thread for execution. The background thread owns the engine and session objects and serializes any use of them through a run() method and internal queue. E.g., very simple single-thread pool. The application object instances themselves are accessed normally in the main Twisted thread. When a Twisted routine wants to use SQLAlchemy operations it just calls run() or encapsulates them in a small function to operate on. I originally toyed with wrapping session or engine objects to more dynamically or automatically adapt things, but wasn't comfortable with the result. So in the end it's pretty explicit in the code, but then that's one of the attractive qualities to me of both Twisted and SQLAlchemy - not too much "magic" going on implicitly. The idea was that the overall objects involved are common at the Twisted thread level so you're application is already set for a single threaded (asynchronous) environment. This just defers the actual database operations (via SA) into the background thread, but doesn't attempt to simulate multiple threads of object state (such as having a distinct session per thread in a multi-threaded application might do). In case you're interested, it's tiny, so here's my current database class (still needs some final error checking polish, but I'm using it in development): - - - - - - - - - - - - - - - - - - - - - - - - - import threading import Queue import sqlalchemy as sa import sqlalchemy.orm as orm from twisted.internet import reactor from twisted.internet.defer import Deferred from twisted.python.failure import Failure # # -------------------------------------------------------------------------- # class Database(object): """Wrapper for a SQLAlchemy engine supporting execution of database related activities in a background thread with the result communicated back through a deferred. If a metadata object is provided to the constructor, it is connected to the underlying engine within the background thread. (And is, presumably a DynamicMetaData object)""" def __init__(self, db_url, metadata=None, *engine_args, **engine_kwargs): self.queue = Queue.Queue() self.started = threading.Event() self.dbthread = threading.Thread(target=self._dbThread, args=(db_url, metadata, engine_args, engine_kwargs)) self.dbthread.setDaemon(True) self.dbthread.start() self.started.wait() # # Background execution thread # def _dbThread(self, db_url, metadata, eargs, ekwargs): self.engine = sa.create_engine(db_url, *eargs, **ekwargs) self.session = orm.create_session(self.engine) if metadata: metadata.connect(self.engine) self.started.set() while 1: op = self.queue.get() if op is None: return else: func, args, kwargs, d = op try: result = d.callback, func(*args, **kwargs) except: result = d.errback, Failure() reactor.callFromThread(*result) # # Primary thread entry points # def run(self, func, *args, **kwargs): result = Deferred() self.queue.put((func, args, kwargs, result)) return result def shutdown(self): self.queue.put(None) self.dbthread.join(2) - - - - - - - - - - - - - - - - - - - - - - - - - Typical initialization (startup before initial reactor.run()): db = database.Database('sqlite:///%s' % db_filename, metadata=schema.metadata) And here's a simple retrieval obj objects through the ORM within a server routine (a PB published object): def remote_jobs(self): return self.db.run(lambda:self.db.session.query(model.Job).select()) Here's an operation with a few more steps encapsulated into a function (it saves a copy of a deleted object into an archive table): def remote_deleteJob(self, job_id): def dbop(job_id=job_id): s = self.db.session job = s.query(model.Job).get(job_id) s.delete(job) # Create duplicate for archiving (Skipping _* attributes # ignores all the SA state stuff) archive = model.Job() for attr, value in job.__dict__.items(): if attr[0] != '_': setattr(archive, attr, value) s.save(archive, entity_name='deleted') s.flush() d = self.db.run(dbop) return d And if you've got Python 2.5, here's an inlineCallbacks method (slighted edited down): @inlineCallbacks def remote_uploadRequest(self, job_id, job_file): job = yield self.db.run(lambda: self.db.session(model.Job).get(job_id)) if not job: raise Exception('Invalid job') # Mark that it hasn't been uploaded yet job_file.uploaded = None def saveFile(job_file=job_file): self.db.session.save(job_file) self.db.session.flush() # Insert it into the database yield self.db.run(saveFile) upload_key = self.fileio.setupUpload(job_file.name) returnValue({'port': self.port, 'key': upload_key}) -- David --~--~---------~--~----~------------~-------~--~----~ You received this message because you are subscribed to the Google Groups "sqlalchemy" group. To post to this group, send email to sqlalchemy@googlegroups.com To unsubscribe from this group, send email to [EMAIL PROTECTED] For more options, visit this group at http://groups.google.com/group/sqlalchemy?hl=en -~----------~----~----~----~------~----~------~--~---