Hi Michael et al,

I am banging my ahead into a (so it seems) trivial problem for days now.
Basically, I finally need to lock my SQLite database because multiple
threads are writing to it. This does not happen to often, but I have a
single thread that is dumping thousands of records into the database
while the user interface might concurrently do simple updates like
"update component set short_description='Foo' where id=10".

Originally the sync thread was slow enough that the SQLite side of
locking did work. Now that it is optimized a bit it does not keep up
anymore. This leads to all other requests going the "Database is locked"
way of failing.

99% of the time all db access is sequential so I figured it would
suffice to lock the database before I do anything to it and unlock the
database when done. I tried to instrument all code but with the unit of
work pattern it is hard to find out where the lock was forgotten...

So my current approach is to use the before_execute event to open a lock
and when a connection is returned to the pool I unlock it. I attached
the code of that mechanism.

My first tests with the SQLAlchemy core where promising, but when using
the ORM I get a bunch of deadlocks where it seems like the session opens
two connections A and B where A locks B out. I can provide more data and
example code, but I would first like to know if my approach is
completely bogus in your eyes.

If it is I am open to better ideas.

BTW: Originally I captured before_flush and commit/rollback session
events, but this still created locking errors due to read requests going
unchecked.

Greetings, Torsten

-- 
DYNAmore Gesellschaft fuer Ingenieurdienstleistungen mbH
Torsten Landschoff

Office Dresden
Tel: +49-(0)351-4519587
Fax: +49-(0)351-4519561

mailto:torsten.landsch...@dynamore.de
http://www.dynamore.de

Registration court: Mannheim, HRB: 109659, based in Karlsruhe,
Managing director:  Prof. Dr. K. Schweizerhof, Dipl.-Math. U. Franz

-- 
You received this message because you are subscribed to the Google Groups 
"sqlalchemy" group.
To post to this group, send email to sqlalchemy@googlegroups.com.
To unsubscribe from this group, send email to 
sqlalchemy+unsubscr...@googlegroups.com.
For more options, visit this group at 
http://groups.google.com/group/sqlalchemy?hl=en.

# -*- coding: utf-8 -*-

import collections
import threading
import traceback
from sqlalchemy import event
from sqlalchemy.engine import Connection


class DatabaseAutoLocker(object):
    """
    Verwaltet eine Zugriffssperre auf eine Datenbank, die den exklusiven Zugriff
    durch einen einzelnen Thread sicherstellt. Die gedachte Verwendung ist, eine
    Instanz dieser Klasse nach dem Erstellen der Engine anzulegen::

       engine = create_engine("sqlite:///test.db")
       DatabaseAutoLocker(engine)

    Die DatabaseAutoLocker-Instanz "hängt" sich dann über Events an die engine und
    lebt, solange diese engine existiert. Daher braucht man keine Referenz auf
    den Locker aufzubewahren.
    """

    def __init__(self, engine, timeout=None):
        """
        Erstellt einen Autolocker für die in *engine* gegebene Datenbank-Engine.

        :param engine: Datenbank-Engine als Instanz von sqlalchemy.engine

        :param timeout: Zeit in Sekunden, die ein Client auf die Datenbankverbindung
                wartet, bevor eine Exception geworfen wird. None (default) deaktiviert
                den Timeout und der Client wartet bis zuletzt.
        """
        self.timeout = timeout

        #: Schützt den Zugriff auf interne Daten
        self._mutex = threading.RLock()

        #: Enthält die Liste der noch auf die Datenbanksperre wartenden Clienten
        self._pending_requests = collections.deque()

        #: Aktuell aktive Verbindung (diese hat die Datenbank für sich gesperrt). None,
        #: wenn keine Verbindung die Sperre hat.
        self._active_dbapi_connection = None

        #: Wenn aktiviert liefert dies den Traceback des Aufrufers, der die Datenbank
        #: gegenwärtig gesperrt hält, sonst immer None.
        self._active_locker_traceback = None

        event.listen(engine, "before_execute", self.__connection_event_before_execute)
        event.listen(engine, "checkin", self.__pool_event_checkin)

    def __connection_event_before_execute(self, conn, clauseelement, multiparams, params):
        """
        Registriert die erste Ausführung eines Kommandos über eine Datenbankverbindung.
        Hier muss die Datenbank für andere Verbindung gesperrt werden.
        """
        dbapi_connection = _get_dbapi_connection(conn)
        request = None
        with self._mutex:
            if self._active_dbapi_connection is dbapi_connection:
                # Nichts zu tun, die Verbindung ist schon im Besitz der Sperre.
                return

            locker_traceback = None
            if self.timeout is not None:
                locker_traceback = traceback.format_stack()

            if self._active_dbapi_connection is None:
                # Keine andere Verbindung aktiv, dann ist dies jetzt die aktive Verbindung
                self._active_dbapi_connection = dbapi_connection
                self._active_locker_traceback = locker_traceback
            else:
                # Ansonsten müssen wir einen Antrag stellen
                request = _Request(dbapi_connection, locker_traceback)
                self._pending_requests.append(request)

        if request:
            request.wait(self.timeout)
            with self._mutex:
                if dbapi_connection is self._active_dbapi_connection:
                    # Wir wurden durch den vorigen Client als neuer Besitzer der Datenbank
                    # eingetragen.
                    pass
                else:
                    # Dann war es ein Timeout und wir geben auf. Demnach sollte der Request
                    # noch in der Liste von wartenden Anfragen sein und wir entfernen uns
                    # wieder.
                    self._pending_requests.remove(request)
                    self.__raise_locked_exception()

    def __raise_locked_exception(self):
        if self._active_locker_traceback:
            raise DatabaseLockedException(
                    "Database is locked by connection {0!r}, established here:\n{1}."
                    .format(self._active_dbapi_connection, "".join(self._active_locker_traceback)))
        else:
            raise DatabaseLockedException(
                    "Database is locked by connection {0!r}.".format(self._active_dbapi_connection))

    def __pool_event_checkin(self, dbapi_connection, connection_record):
        """
        Reagiert auf die Rückgabe einer Datenbankverbindung an den Pool. In
        diesem Moment ist klar, dass mit der Verbindung nichts mehr gemacht
        wird und wir können die Sperre dafür aufheben.
        """
        with self._mutex:
            if dbapi_connection is self._active_dbapi_connection:
                # Der eine Client ist fertig, Datenbank freigeben.
                self._active_dbapi_connection = None
                # Wenn jemand wartet, diesen gleich benachrichtigen und ihn die Sperre
                # übernehmen lassen.
                if self._pending_requests:
                    request = self._pending_requests.popleft()
                    self._active_dbapi_connection = request.dbapi_connection
                    self._active_locker_traceback = request.locker_traceback
                    request.notify()


class _Request(object):
    __slots__ = "dbapi_connection", "event", "locker_traceback"

    def __init__(self, dbapi_connection, locker_traceback):
        self.dbapi_connection = dbapi_connection
        self.locker_traceback = locker_traceback
        self.event = threading.Event()

    def notify(self):
        self.event.set()

    def wait(self, timeout):
        self.event.wait(timeout)


class DatabaseLockedException(Exception):
    u"""
    Ein anderer Thread hat die Datenbank solange gesperrt gehalten, dass der
    Versuch, diese zu sperren, gescheitert ist.
    """
    pass


def _get_dbapi_connection(conn):
    if not isinstance(conn, Connection):
        raise TypeError("{0!r} is not a SQLAlchemy connection.".format(conn))
    dbapi_connection = conn.connection.connection
    return dbapi_connection

Reply via email to