Hi, I'm using PostgreSQL advisory locks in a multithreaded program. Worker threads acquire locks with session.execute(select([func.pg_advisory_lock(key)])) during a transaction and release them just after session.commit(). Sometimes however, the connection behind the thread's session will have changed after COMMIT, making it impossible for the thread to release the locks it acquired. Note that I'm using scoped_session()'s class methods everywhere.
A python script to reproduce the problem and its output are attached below. Is there a way to force the session to use the same connection within a thread ? Would I then have to recycle the connections from time to time ? Thanks, Julien === sa_pg_advisory_locks.py == # -*- coding: utf8 -*- import random import os import threading import time from sqlalchemy import MetaData from sqlalchemy.engine import create_engine from sqlalchemy.orm import create_session, scoped_session, sessionmaker, reconstructor from sqlalchemy.sql import func, select sa_engine = create_engine(os.environ['TEST_DSN']) session = scoped_session(lambda: create_session(sa_engine, autoflush=True, expire_on_commit=True, autocommit=False)) # Toggle this switch to see the difference in behaviour COMMIT_BEFORE_LOCK_RELEASE = True # COMMIT_BEFORE_LOCK_RELEASE = False print 'Will commit %s releasing advisory lock' % ('before' if COMMIT_BEFORE_LOCK_RELEASE else 'after') # Synchronize program termination event = threading.Event() # Test function, will run concurrently in two threads def run_test(): try: i = 0 while 1: if event.isSet() or i >= 100: break # Show sign of life if i and (i % 50 == 0): print i key = random.randint(1,2**16) pid, _ = session.execute(select([func.pg_backend_pid(), func.pg_advisory_lock(key)])).fetchone() now = session.execute(select([func.now()])).scalar() if COMMIT_BEFORE_LOCK_RELEASE: session.commit() pid_, unlocked = session.execute(select([func.pg_backend_pid(), func.pg_advisory_unlock(key)])).fetchone() if unlocked: assert pid_ == pid else: raise AssertionError('Iteration %i, acquisition pid %i, release pid %i\n' % (i, pid, pid_)) if not COMMIT_BEFORE_LOCK_RELEASE: session.commit() i += 1 except Exception: event.set() raise event.set() for i in xrange(10): thread = threading.Thread(target=run_test) thread.daemon = True thread.start() event.wait() time.sleep(1) == output == u...@host ~ $ TEST_DSN=$CPYTHON_DSN python sa_pg_advisory_locks.py Will commit before releasing advisory lock Exception in thread Thread-10: Traceback (most recent call last): File "/usr/lib/python2.6/threading.py", line 532, in __bootstrap_inner self.run() File "/usr/lib/python2.6/threading.py", line 484, in run self.__target(*self.__args, **self.__kwargs) File "sa_pg_advisory_locks.py", line 44, in run_test raise AssertionError('Iteration %i, acquisition pid %i, release pid %i\n' % (i, pid, pid_)) AssertionError: Iteration 6, acquisition pid 16676, release pid 27340 Exception in thread Thread-5: Traceback (most recent call last): File "/usr/lib/python2.6/threading.py", line 532, in __bootstrap_inner self.run() File "/usr/lib/python2.6/threading.py", line 484, in run self.__target(*self.__args, **self.__kwargs) File "sa_pg_advisory_locks.py", line 44, in run_test raise AssertionError('Iteration %i, acquisition pid %i, release pid %i\n' % (i, pid, pid_)) AssertionError: Iteration 12, acquisition pid 27340, release pid 16676 Exception in thread Thread-7: Traceback (most recent call last): File "/usr/lib/python2.6/threading.py", line 532, in __bootstrap_inner self.run() File "/usr/lib/python2.6/threading.py", line 484, in run self.__target(*self.__args, **self.__kwargs) File "sa_pg_advisory_locks.py", line 44, in run_test raise AssertionError('Iteration %i, acquisition pid %i, release pid %i\n' % (i, pid, pid_)) AssertionError: Iteration 10, acquisition pid 18248, release pid 8452 Exception in thread Thread-3: Traceback (most recent call last): File "/usr/lib/python2.6/threading.py", line 532, in __bootstrap_inner self.run() File "/usr/lib/python2.6/threading.py", line 484, in run self.__target(*self.__args, **self.__kwargs) File "sa_pg_advisory_locks.py", line 44, in run_test raise AssertionError('Iteration %i, acquisition pid %i, release pid %i\n' % (i, pid, pid_)) AssertionError: Iteration 18, acquisition pid 8452, release pid 18248 u...@host ~ $ TEST_DSN=$JYTHON_DSN jython sa_pg_advisory_locks.py Will commit before releasing advisory lock Exception in thread Thread:Traceback (most recent call last): File "C:\cygwin\home\user\jython\Lib\threading.py", line 179, in _Thread__bootstrap self.run() File "C:\cygwin\home\user\jython\Lib\threading.py", line 170, in run Exception in thread Thread:Traceback (most recent call last): File "C:\cygwin\home\user\jython\Lib\threading.py", line 179, in _Thread__bootstrap self._target(*self._args, **self._kwargs) File "sa_pg_advisory_locks.py", line 44, in run_test raise AssertionError('Iteration %i, acquisition pid %i, release pid %i\n' % (i, pid, pid_)) AssertionError: Iteration 2, acquisition pid 18076, release pid 11332 Exception in thread Thread:Traceback (most recent call last): File "C:\cygwin\home\user\jython\Lib\threading.py", line 179, in _Thread__bootstrap self.run() File "C:\cygwin\home\user\jython\Lib\threading.py", line 170, in run self._target(*self._args, **self._kwargs) File "sa_pg_advisory_locks.py", line 44, in run_test self.run() File "C:\cygwin\home\user\jython\Lib\threading.py", line 170, in run self._target(*self._args, **self._kwargs) File "sa_pg_advisory_locks.py", line 44, in run_test Exception in thread Thread:Traceback (most recent call last): File "C:\cygwin\home\user\jython\Lib\threading.py", line 179, in _Thread__bootstrap self.run() File "C:\cygwin\home\user\jython\Lib\threading.py", line 170, in run self._target(*self._args, **self._kwargs) File "sa_pg_advisory_locks.py", line 44, in run_test raise AssertionError('Iteration %i, acquisition pid %i, release pid %i\n' % (i, pid, pid_)) raise AssertionError('Iteration %i, acquisition pid %i, release pid %i\n' % (i, pid, pid_)) AssertionError: Iteration 10, acquisition pid 11332, release pid 22572 AssertionError: Iteration 5, acquisition pid 22572, release pid 14376 raise AssertionError('Iteration %i, acquisition pid %i, release pid %i\n' % (i, pid, pid_)) AssertionError: Iteration 5, acquisition pid 14376, release pid 18076 u...@host ~ $ TEST_DSN=$CPYTHON_DSN python sa_pg_advisory_locks.py Will commit after releasing advisory lock 50 50 50 50 50 50 50 50 50 50 u...@host ~ $ TEST_DSN=$JYTHON_DSN jython sa_pg_advisory_locks.py Will commit after releasing advisory lock 50 50 50 50 50 50 50 50 50 50 -- You received this message because you are subscribed to the Google Groups "sqlalchemy" group. To post to this group, send email to sqlalch...@googlegroups.com. To unsubscribe from this group, send email to sqlalchemy+unsubscr...@googlegroups.com. For more options, visit this group at http://groups.google.com/group/sqlalchemy?hl=en.