[sqlalchemy] Session uses new connection after commit
Hi, I'm using PostgreSQL advisory locks in a multithreaded program. Worker threads acquire locks with session.execute(select([func.pg_advisory_lock(key)])) during a transaction and release them just after session.commit(). Sometimes however, the connection behind the thread's session will have changed after COMMIT, making it impossible for the thread to release the locks it acquired. Note that I'm using scoped_session()'s class methods everywhere. A python script to reproduce the problem and its output are attached below. Is there a way to force the session to use the same connection within a thread ? Would I then have to recycle the connections from time to time ? Thanks, Julien === sa_pg_advisory_locks.py == # -*- coding: utf8 -*- import random import os import threading import time from sqlalchemy import MetaData from sqlalchemy.engine import create_engine from sqlalchemy.orm import create_session, scoped_session, sessionmaker, reconstructor from sqlalchemy.sql import func, select sa_engine = create_engine(os.environ['TEST_DSN']) session = scoped_session(lambda: create_session(sa_engine, autoflush=True, expire_on_commit=True, autocommit=False)) # Toggle this switch to see the difference in behaviour COMMIT_BEFORE_LOCK_RELEASE = True # COMMIT_BEFORE_LOCK_RELEASE = False print 'Will commit %s releasing advisory lock' % ('before' if COMMIT_BEFORE_LOCK_RELEASE else 'after') # Synchronize program termination event = threading.Event() # Test function, will run concurrently in two threads def run_test(): try: i = 0 while 1: if event.isSet() or i = 100: break # Show sign of life if i and (i % 50 == 0): print i key = random.randint(1,2**16) pid, _ = session.execute(select([func.pg_backend_pid(), func.pg_advisory_lock(key)])).fetchone() now = session.execute(select([func.now()])).scalar() if COMMIT_BEFORE_LOCK_RELEASE: session.commit() pid_, unlocked = session.execute(select([func.pg_backend_pid(), func.pg_advisory_unlock(key)])).fetchone() if unlocked: assert pid_ == pid else: raise AssertionError('Iteration %i, acquisition pid %i, release pid %i\n' % (i, pid, pid_)) if not COMMIT_BEFORE_LOCK_RELEASE: session.commit() i += 1 except Exception: event.set() raise event.set() for i in xrange(10): thread = threading.Thread(target=run_test) thread.daemon = True thread.start() event.wait() time.sleep(1) == output == u...@host ~ $ TEST_DSN=$CPYTHON_DSN python sa_pg_advisory_locks.py Will commit before releasing advisory lock Exception in thread Thread-10: Traceback (most recent call last): File /usr/lib/python2.6/threading.py, line 532, in __bootstrap_inner self.run() File /usr/lib/python2.6/threading.py, line 484, in run self.__target(*self.__args, **self.__kwargs) File sa_pg_advisory_locks.py, line 44, in run_test raise AssertionError('Iteration %i, acquisition pid %i, release pid %i\n' % (i, pid, pid_)) AssertionError: Iteration 6, acquisition pid 16676, release pid 27340 Exception in thread Thread-5: Traceback (most recent call last): File /usr/lib/python2.6/threading.py, line 532, in __bootstrap_inner self.run() File /usr/lib/python2.6/threading.py, line 484, in run self.__target(*self.__args, **self.__kwargs) File sa_pg_advisory_locks.py, line 44, in run_test raise AssertionError('Iteration %i, acquisition pid %i, release pid %i\n' % (i, pid, pid_)) AssertionError: Iteration 12, acquisition pid 27340, release pid 16676 Exception in thread Thread-7: Traceback (most recent call last): File /usr/lib/python2.6/threading.py, line 532, in __bootstrap_inner self.run() File /usr/lib/python2.6/threading.py, line 484, in run self.__target(*self.__args, **self.__kwargs) File sa_pg_advisory_locks.py, line 44, in run_test raise AssertionError('Iteration %i, acquisition pid %i, release pid %i\n' % (i, pid, pid_)) AssertionError: Iteration 10, acquisition pid 18248, release pid 8452 Exception in thread Thread-3: Traceback (most recent call last): File /usr/lib/python2.6/threading.py, line 532, in __bootstrap_inner self.run() File /usr/lib/python2.6/threading.py, line 484, in run self.__target(*self.__args, **self.__kwargs) File sa_pg_advisory_locks.py, line 44, in run_test raise AssertionError('Iteration %i, acquisition pid %i, release pid %i\n' % (i, pid, pid_)) AssertionError: Iteration 18, acquisition pid 8452, release pid 18248 u...@host ~ $
Re: [sqlalchemy] Session uses new connection after commit
On Aug 24, 2010, at 6:12 AM, Julien Demoor wrote: Hi, I'm using PostgreSQL advisory locks in a multithreaded program. Worker threads acquire locks with session.execute(select([func.pg_advisory_lock(key)])) during a transaction and release them just after session.commit(). Sometimes however, the connection behind the thread's session will have changed after COMMIT, making it impossible for the thread to release the locks it acquired. Note that I'm using scoped_session()'s class methods everywhere. A python script to reproduce the problem and its output are attached below. Is there a way to force the session to use the same connection within a thread ? Would I then have to recycle the connections from time to time ? I think the most direct way is to bind the Session to a specific connection: conn = engine.connect() sess = Session(bind=conn) then when the scope of work with the connection is complete: sess.close() conn.close() the indirect way would be to play games with the connection pool, but in this case your application should already have explicit boundaries where you'd like this connection to stay in play so the above approach is straightforward. Thanks, Julien === sa_pg_advisory_locks.py == # -*- coding: utf8 -*- import random import os import threading import time from sqlalchemy import MetaData from sqlalchemy.engine import create_engine from sqlalchemy.orm import create_session, scoped_session, sessionmaker, reconstructor from sqlalchemy.sql import func, select sa_engine = create_engine(os.environ['TEST_DSN']) session = scoped_session(lambda: create_session(sa_engine, autoflush=True, expire_on_commit=True, autocommit=False)) # Toggle this switch to see the difference in behaviour COMMIT_BEFORE_LOCK_RELEASE = True # COMMIT_BEFORE_LOCK_RELEASE = False print 'Will commit %s releasing advisory lock' % ('before' if COMMIT_BEFORE_LOCK_RELEASE else 'after') # Synchronize program termination event = threading.Event() # Test function, will run concurrently in two threads def run_test(): try: i = 0 while 1: if event.isSet() or i = 100: break # Show sign of life if i and (i % 50 == 0): print i key = random.randint(1,2**16) pid, _ = session.execute(select([func.pg_backend_pid(), func.pg_advisory_lock(key)])).fetchone() now = session.execute(select([func.now()])).scalar() if COMMIT_BEFORE_LOCK_RELEASE: session.commit() pid_, unlocked = session.execute(select([func.pg_backend_pid(), func.pg_advisory_unlock(key)])).fetchone() if unlocked: assert pid_ == pid else: raise AssertionError('Iteration %i, acquisition pid %i, release pid %i\n' % (i, pid, pid_)) if not COMMIT_BEFORE_LOCK_RELEASE: session.commit() i += 1 except Exception: event.set() raise event.set() for i in xrange(10): thread = threading.Thread(target=run_test) thread.daemon = True thread.start() event.wait() time.sleep(1) == output == u...@host ~ $ TEST_DSN=$CPYTHON_DSN python sa_pg_advisory_locks.py Will commit before releasing advisory lock Exception in thread Thread-10: Traceback (most recent call last): File /usr/lib/python2.6/threading.py, line 532, in __bootstrap_inner self.run() File /usr/lib/python2.6/threading.py, line 484, in run self.__target(*self.__args, **self.__kwargs) File sa_pg_advisory_locks.py, line 44, in run_test raise AssertionError('Iteration %i, acquisition pid %i, release pid %i\n' % (i, pid, pid_)) AssertionError: Iteration 6, acquisition pid 16676, release pid 27340 Exception in thread Thread-5: Traceback (most recent call last): File /usr/lib/python2.6/threading.py, line 532, in __bootstrap_inner self.run() File /usr/lib/python2.6/threading.py, line 484, in run self.__target(*self.__args, **self.__kwargs) File sa_pg_advisory_locks.py, line 44, in run_test raise AssertionError('Iteration %i, acquisition pid %i, release pid %i\n' % (i, pid, pid_)) AssertionError: Iteration 12, acquisition pid 27340, release pid 16676 Exception in thread Thread-7: Traceback (most recent call last): File /usr/lib/python2.6/threading.py, line 532, in __bootstrap_inner self.run() File /usr/lib/python2.6/threading.py, line 484, in run self.__target(*self.__args, **self.__kwargs) File sa_pg_advisory_locks.py, line 44, in run_test raise AssertionError('Iteration %i, acquisition pid %i, release pid %i\n' % (i, pid, pid_))