Thanks a lot, that solves my problem. Also it seems to work well without instantiating the scoped session, so this is perfect.
On Aug 24, 3:13 pm, Michael Bayer <mike...@zzzcomputing.com> wrote: > On Aug 24, 2010, at 6:12 AM, Julien Demoor wrote: > > > > > Hi, > > > I'm using PostgreSQL advisory locks in a multithreaded program. Worker > > threads acquire locks with > > session.execute(select([func.pg_advisory_lock(key)])) during a > > transaction and release them just after session.commit(). Sometimes > > however, the connection behind the thread's session will have changed > > after COMMIT, making it impossible for the thread to release the locks > > it acquired. Note that I'm using scoped_session()'s class methods > > everywhere. > > > A python script to reproduce the problem and its output are attached > > below. > > > Is there a way to force the session to use the same connection within > > a thread ? Would I then have to recycle the connections from time to > > time ? > > I think the most direct way is to bind the Session to a specific connection: > > conn = engine.connect() > sess = Session(bind=conn) > > then when the scope of work with the connection is complete: > > sess.close() > conn.close() > > the "indirect" way would be to play games with the connection pool, but in > this case your application should already have explicit boundaries where > you'd like this connection to stay in play so the above approach is > straightforward. > > > > > Thanks, > > > Julien > > > === sa_pg_advisory_locks.py == > > > # -*- coding: utf8 -*- > > import random > > import os > > import threading > > import time > > > from sqlalchemy import MetaData > > from sqlalchemy.engine import create_engine > > from sqlalchemy.orm import create_session, scoped_session, > > sessionmaker, reconstructor > > from sqlalchemy.sql import func, select > > > sa_engine = create_engine(os.environ['TEST_DSN']) > > > session = scoped_session(lambda: create_session(sa_engine, > > autoflush=True, expire_on_commit=True, autocommit=False)) > > > # Toggle this switch to see the difference in behaviour > > COMMIT_BEFORE_LOCK_RELEASE = True > > # COMMIT_BEFORE_LOCK_RELEASE = False > > > print 'Will commit %s releasing advisory lock' % ('before' if > > COMMIT_BEFORE_LOCK_RELEASE else 'after') > > > # Synchronize program termination > > event = threading.Event() > > > # Test function, will run concurrently in two threads > > def run_test(): > > try: > > i = 0 > > while 1: > > if event.isSet() or i >= 100: > > break > > # Show sign of life > > if i and (i % 50 == 0): > > print i > > key = random.randint(1,2**16) > > pid, _ = session.execute(select([func.pg_backend_pid(), > > func.pg_advisory_lock(key)])).fetchone() > > now = session.execute(select([func.now()])).scalar() > > if COMMIT_BEFORE_LOCK_RELEASE: > > session.commit() > > pid_, unlocked = > > session.execute(select([func.pg_backend_pid(), > > func.pg_advisory_unlock(key)])).fetchone() > > if unlocked: > > assert pid_ == pid > > else: > > raise AssertionError('Iteration %i, acquisition > > pid %i, release > > pid %i\n' % (i, pid, pid_)) > > if not COMMIT_BEFORE_LOCK_RELEASE: > > session.commit() > > i += 1 > > except Exception: > > event.set() > > raise > > event.set() > > > for i in xrange(10): > > thread = threading.Thread(target=run_test) > > thread.daemon = True > > thread.start() > > > event.wait() > > time.sleep(1) > > > == output == > > > u...@host ~ > > $ TEST_DSN=$CPYTHON_DSN python sa_pg_advisory_locks.py > > Will commit before releasing advisory lock > > Exception in thread Thread-10: > > Traceback (most recent call last): > > File "/usr/lib/python2.6/threading.py", line 532, in > > __bootstrap_inner > > self.run() > > File "/usr/lib/python2.6/threading.py", line 484, in run > > self.__target(*self.__args, **self.__kwargs) > > File "sa_pg_advisory_locks.py", line 44, in run_test > > raise AssertionError('Iteration %i, acquisition pid %i, release > > pid %i\n' % (i, pid, pid_)) > > AssertionError: Iteration 6, acquisition pid 16676, release pid 27340 > > > Exception in thread Thread-5: > > Traceback (most recent call last): > > File "/usr/lib/python2.6/threading.py", line 532, in > > __bootstrap_inner > > self.run() > > File "/usr/lib/python2.6/threading.py", line 484, in run > > self.__target(*self.__args, **self.__kwargs) > > File "sa_pg_advisory_locks.py", line 44, in run_test > > raise AssertionError('Iteration %i, acquisition pid %i, release > > pid %i\n' % (i, pid, pid_)) > > AssertionError: Iteration 12, acquisition pid 27340, release pid 16676 > > > Exception in thread Thread-7: > > Traceback (most recent call last): > > File "/usr/lib/python2.6/threading.py", line 532, in > > __bootstrap_inner > > self.run() > > File "/usr/lib/python2.6/threading.py", line 484, in run > > self.__target(*self.__args, **self.__kwargs) > > File "sa_pg_advisory_locks.py", line 44, in run_test > > raise AssertionError('Iteration %i, acquisition pid %i, release > > pid %i\n' % (i, pid, pid_)) > > AssertionError: Iteration 10, acquisition pid 18248, release pid 8452 > > > Exception in thread Thread-3: > > Traceback (most recent call last): > > File "/usr/lib/python2.6/threading.py", line 532, in > > __bootstrap_inner > > self.run() > > File "/usr/lib/python2.6/threading.py", line 484, in run > > self.__target(*self.__args, **self.__kwargs) > > File "sa_pg_advisory_locks.py", line 44, in run_test > > raise AssertionError('Iteration %i, acquisition pid %i, release > > pid %i\n' % (i, pid, pid_)) > > AssertionError: Iteration 18, acquisition pid 8452, release pid 18248 > > > u...@host ~ > > $ TEST_DSN=$JYTHON_DSN jython sa_pg_advisory_locks.py > > Will commit before releasing advisory lock > > Exception in thread Thread:Traceback (most recent call last): > > File "C:\cygwin\home\user\jython\Lib\threading.py", line 179, in > > _Thread__bootstrap > > self.run() > > File "C:\cygwin\home\user\jython\Lib\threading.py", line 170, in run > > Exception in thread Thread:Traceback (most recent call last): > > File "C:\cygwin\home\user\jython\Lib\threading.py", line 179, in > > _Thread__bootstrap > > self._target(*self._args, **self._kwargs) > > File "sa_pg_advisory_locks.py", line 44, in run_test > > raise AssertionError('Iteration %i, acquisition pid %i, release > > pid %i\n' % (i, pid, pid_)) > > AssertionError: Iteration 2, acquisition pid 18076, release pid 11332 > > > Exception in thread Thread:Traceback (most recent call last): > > File "C:\cygwin\home\user\jython\Lib\threading.py", line 179, in > > _Thread__bootstrap > > self.run() > > File "C:\cygwin\home\user\jython\Lib\threading.py", line 170, in run > > self._target(*self._args, **self._kwargs) > > File "sa_pg_advisory_locks.py", line 44, in run_test > > self.run() > > File "C:\cygwin\home\user\jython\Lib\threading.py", line 170, in run > > self._target(*self._args, **self._kwargs) > > File "sa_pg_advisory_locks.py", line 44, in run_test > > Exception in thread Thread:Traceback (most recent call last): > > File "C:\cygwin\home\user\jython\Lib\threading.py", line 179, in > > _Thread__bootstrap > > self.run() > > File "C:\cygwin\home\user\jython\Lib\threading.py", line 170, in run > > self._target(*self._args, **self._kwargs) > > File "sa_pg_advisory_locks.py", line 44, in run_test > > raise AssertionError('Iteration %i, acquisition pid %i, release > > pid %i\n' % (i, pid, pid_)) > > raise AssertionError('Iteration %i, acquisition pid %i, release > > pid %i\n' % (i, pid, pid_)) > > AssertionError: Iteration 10, acquisition pid 11332, release pid 22572 > > > AssertionError: Iteration 5, acquisition pid 22572, release pid 14376 > > > raise AssertionError('Iteration %i, acquisition pid %i, release > > pid %i\n' % (i, pid, pid_)) > > AssertionError: Iteration 5, acquisition pid 14376, release pid 18076 > > > u...@host ~ > > $ TEST_DSN=$CPYTHON_DSN python sa_pg_advisory_locks.py > > Will commit after releasing advisory lock > > 50 > > 50 > > 50 > > 50 > > 50 > > 50 > > 50 > > 50 > > 50 > > 50 > > > u...@host ~ > > $ TEST_DSN=$JYTHON_DSN jython sa_pg_advisory_locks.py > > Will commit after releasing advisory lock > > 50 > > 50 > > 50 > > 50 > > 50 > > 50 > > 50 > > 50 > > 50 > > 50 > > > -- > > You received this message because you are subscribed to the Google Groups > > "sqlalchemy" group. > > To post to this group, send email to sqlalch...@googlegroups.com. > > To unsubscribe from this group, send email to > > sqlalchemy+unsubscr...@googlegroups.com. > > For more options, visit this group > > athttp://groups.google.com/group/sqlalchemy?hl=en. -- You received this message because you are subscribed to the Google Groups "sqlalchemy" group. To post to this group, send email to sqlalch...@googlegroups.com. To unsubscribe from this group, send email to sqlalchemy+unsubscr...@googlegroups.com. For more options, visit this group at http://groups.google.com/group/sqlalchemy?hl=en.