Hi Michael et al, I am banging my ahead into a (so it seems) trivial problem for days now. Basically, I finally need to lock my SQLite database because multiple threads are writing to it. This does not happen to often, but I have a single thread that is dumping thousands of records into the database while the user interface might concurrently do simple updates like "update component set short_description='Foo' where id=10".
Originally the sync thread was slow enough that the SQLite side of locking did work. Now that it is optimized a bit it does not keep up anymore. This leads to all other requests going the "Database is locked" way of failing. 99% of the time all db access is sequential so I figured it would suffice to lock the database before I do anything to it and unlock the database when done. I tried to instrument all code but with the unit of work pattern it is hard to find out where the lock was forgotten... So my current approach is to use the before_execute event to open a lock and when a connection is returned to the pool I unlock it. I attached the code of that mechanism. My first tests with the SQLAlchemy core where promising, but when using the ORM I get a bunch of deadlocks where it seems like the session opens two connections A and B where A locks B out. I can provide more data and example code, but I would first like to know if my approach is completely bogus in your eyes. If it is I am open to better ideas. BTW: Originally I captured before_flush and commit/rollback session events, but this still created locking errors due to read requests going unchecked. Greetings, Torsten -- DYNAmore Gesellschaft fuer Ingenieurdienstleistungen mbH Torsten Landschoff Office Dresden Tel: +49-(0)351-4519587 Fax: +49-(0)351-4519561 mailto:torsten.landsch...@dynamore.de http://www.dynamore.de Registration court: Mannheim, HRB: 109659, based in Karlsruhe, Managing director: Prof. Dr. K. Schweizerhof, Dipl.-Math. U. Franz -- You received this message because you are subscribed to the Google Groups "sqlalchemy" group. To post to this group, send email to sqlalchemy@googlegroups.com. To unsubscribe from this group, send email to sqlalchemy+unsubscr...@googlegroups.com. For more options, visit this group at http://groups.google.com/group/sqlalchemy?hl=en.
# -*- coding: utf-8 -*- import collections import threading import traceback from sqlalchemy import event from sqlalchemy.engine import Connection class DatabaseAutoLocker(object): """ Verwaltet eine Zugriffssperre auf eine Datenbank, die den exklusiven Zugriff durch einen einzelnen Thread sicherstellt. Die gedachte Verwendung ist, eine Instanz dieser Klasse nach dem Erstellen der Engine anzulegen:: engine = create_engine("sqlite:///test.db") DatabaseAutoLocker(engine) Die DatabaseAutoLocker-Instanz "hängt" sich dann über Events an die engine und lebt, solange diese engine existiert. Daher braucht man keine Referenz auf den Locker aufzubewahren. """ def __init__(self, engine, timeout=None): """ Erstellt einen Autolocker für die in *engine* gegebene Datenbank-Engine. :param engine: Datenbank-Engine als Instanz von sqlalchemy.engine :param timeout: Zeit in Sekunden, die ein Client auf die Datenbankverbindung wartet, bevor eine Exception geworfen wird. None (default) deaktiviert den Timeout und der Client wartet bis zuletzt. """ self.timeout = timeout #: Schützt den Zugriff auf interne Daten self._mutex = threading.RLock() #: Enthält die Liste der noch auf die Datenbanksperre wartenden Clienten self._pending_requests = collections.deque() #: Aktuell aktive Verbindung (diese hat die Datenbank für sich gesperrt). None, #: wenn keine Verbindung die Sperre hat. self._active_dbapi_connection = None #: Wenn aktiviert liefert dies den Traceback des Aufrufers, der die Datenbank #: gegenwärtig gesperrt hält, sonst immer None. self._active_locker_traceback = None event.listen(engine, "before_execute", self.__connection_event_before_execute) event.listen(engine, "checkin", self.__pool_event_checkin) def __connection_event_before_execute(self, conn, clauseelement, multiparams, params): """ Registriert die erste Ausführung eines Kommandos über eine Datenbankverbindung. Hier muss die Datenbank für andere Verbindung gesperrt werden. """ dbapi_connection = _get_dbapi_connection(conn) request = None with self._mutex: if self._active_dbapi_connection is dbapi_connection: # Nichts zu tun, die Verbindung ist schon im Besitz der Sperre. return locker_traceback = None if self.timeout is not None: locker_traceback = traceback.format_stack() if self._active_dbapi_connection is None: # Keine andere Verbindung aktiv, dann ist dies jetzt die aktive Verbindung self._active_dbapi_connection = dbapi_connection self._active_locker_traceback = locker_traceback else: # Ansonsten müssen wir einen Antrag stellen request = _Request(dbapi_connection, locker_traceback) self._pending_requests.append(request) if request: request.wait(self.timeout) with self._mutex: if dbapi_connection is self._active_dbapi_connection: # Wir wurden durch den vorigen Client als neuer Besitzer der Datenbank # eingetragen. pass else: # Dann war es ein Timeout und wir geben auf. Demnach sollte der Request # noch in der Liste von wartenden Anfragen sein und wir entfernen uns # wieder. self._pending_requests.remove(request) self.__raise_locked_exception() def __raise_locked_exception(self): if self._active_locker_traceback: raise DatabaseLockedException( "Database is locked by connection {0!r}, established here:\n{1}." .format(self._active_dbapi_connection, "".join(self._active_locker_traceback))) else: raise DatabaseLockedException( "Database is locked by connection {0!r}.".format(self._active_dbapi_connection)) def __pool_event_checkin(self, dbapi_connection, connection_record): """ Reagiert auf die Rückgabe einer Datenbankverbindung an den Pool. In diesem Moment ist klar, dass mit der Verbindung nichts mehr gemacht wird und wir können die Sperre dafür aufheben. """ with self._mutex: if dbapi_connection is self._active_dbapi_connection: # Der eine Client ist fertig, Datenbank freigeben. self._active_dbapi_connection = None # Wenn jemand wartet, diesen gleich benachrichtigen und ihn die Sperre # übernehmen lassen. if self._pending_requests: request = self._pending_requests.popleft() self._active_dbapi_connection = request.dbapi_connection self._active_locker_traceback = request.locker_traceback request.notify() class _Request(object): __slots__ = "dbapi_connection", "event", "locker_traceback" def __init__(self, dbapi_connection, locker_traceback): self.dbapi_connection = dbapi_connection self.locker_traceback = locker_traceback self.event = threading.Event() def notify(self): self.event.set() def wait(self, timeout): self.event.wait(timeout) class DatabaseLockedException(Exception): u""" Ein anderer Thread hat die Datenbank solange gesperrt gehalten, dass der Versuch, diese zu sperren, gescheitert ist. """ pass def _get_dbapi_connection(conn): if not isinstance(conn, Connection): raise TypeError("{0!r} is not a SQLAlchemy connection.".format(conn)) dbapi_connection = conn.connection.connection return dbapi_connection