Thanks a lot, that solves my problem. Also it seems to work well
without instantiating the scoped session, so this is perfect.

On Aug 24, 3:13 pm, Michael Bayer <mike...@zzzcomputing.com> wrote:
> On Aug 24, 2010, at 6:12 AM, Julien Demoor wrote:
>
>
>
> > Hi,
>
> > I'm using PostgreSQL advisory locks in a multithreaded program. Worker
> > threads acquire locks with
> > session.execute(select([func.pg_advisory_lock(key)])) during a
> > transaction and release them just after  session.commit(). Sometimes
> > however, the connection behind the thread's session will have changed
> > after COMMIT, making it impossible for the thread to release the locks
> > it acquired. Note that I'm using scoped_session()'s class methods
> > everywhere.
>
> > A python script to reproduce the problem and its output are attached
> > below.
>
> > Is there a way to force the session to use the same connection within
> > a thread ? Would I then have to recycle the connections from time to
> > time ?
>
> I think the most direct way is to bind the Session to a specific connection:
>
> conn = engine.connect()
> sess = Session(bind=conn)
>
> then when the scope of work with the connection is complete:
>
> sess.close()
> conn.close()
>
> the "indirect" way would be to play games with the connection pool, but in 
> this case your application should already have explicit boundaries where 
> you'd like this connection to stay in play so the above approach is 
> straightforward.
>
>
>
> > Thanks,
>
> > Julien
>
> > === sa_pg_advisory_locks.py ==
>
> > # -*- coding: utf8 -*-
> > import random
> > import os
> > import threading
> > import time
>
> > from sqlalchemy import MetaData
> > from sqlalchemy.engine import create_engine
> > from sqlalchemy.orm import create_session, scoped_session,
> > sessionmaker, reconstructor
> > from sqlalchemy.sql import func, select
>
> > sa_engine = create_engine(os.environ['TEST_DSN'])
>
> > session = scoped_session(lambda: create_session(sa_engine,
> > autoflush=True, expire_on_commit=True, autocommit=False))
>
> > # Toggle this switch to see the difference in behaviour
> > COMMIT_BEFORE_LOCK_RELEASE = True
> > # COMMIT_BEFORE_LOCK_RELEASE = False
>
> > print 'Will commit %s releasing advisory lock' % ('before' if
> > COMMIT_BEFORE_LOCK_RELEASE else 'after')
>
> > # Synchronize program termination
> > event = threading.Event()
>
> > # Test function, will run concurrently in two threads
> > def run_test():
> >    try:
> >            i = 0
> >            while 1:
> >                    if event.isSet() or i >= 100:
> >                            break
> >                    # Show sign of life
> >                    if i and (i % 50 == 0):
> >                            print i
> >                    key = random.randint(1,2**16)
> >                    pid, _ = session.execute(select([func.pg_backend_pid(),
> > func.pg_advisory_lock(key)])).fetchone()
> >                    now = session.execute(select([func.now()])).scalar()
> >                    if COMMIT_BEFORE_LOCK_RELEASE:
> >                            session.commit()
> >                    pid_, unlocked = 
> > session.execute(select([func.pg_backend_pid(),
> > func.pg_advisory_unlock(key)])).fetchone()
> >                    if unlocked:
> >                            assert pid_ == pid
> >                    else:
> >                            raise AssertionError('Iteration %i, acquisition 
> > pid %i, release
> > pid %i\n' % (i, pid, pid_))
> >                    if not COMMIT_BEFORE_LOCK_RELEASE:
> >                            session.commit()
> >                    i += 1
> >    except Exception:
> >            event.set()
> >            raise
> >    event.set()
>
> > for i in xrange(10):
> >    thread = threading.Thread(target=run_test)
> >    thread.daemon = True
> >    thread.start()
>
> > event.wait()
> > time.sleep(1)
>
> > == output ==
>
> > u...@host ~
> > $ TEST_DSN=$CPYTHON_DSN python sa_pg_advisory_locks.py
> > Will commit before releasing advisory lock
> > Exception in thread Thread-10:
> > Traceback (most recent call last):
> >  File "/usr/lib/python2.6/threading.py", line 532, in
> > __bootstrap_inner
> >    self.run()
> >  File "/usr/lib/python2.6/threading.py", line 484, in run
> >    self.__target(*self.__args, **self.__kwargs)
> >  File "sa_pg_advisory_locks.py", line 44, in run_test
> >    raise AssertionError('Iteration %i, acquisition pid %i, release
> > pid %i\n' % (i, pid, pid_))
> > AssertionError: Iteration 6, acquisition pid 16676, release pid 27340
>
> > Exception in thread Thread-5:
> > Traceback (most recent call last):
> >  File "/usr/lib/python2.6/threading.py", line 532, in
> > __bootstrap_inner
> >    self.run()
> >  File "/usr/lib/python2.6/threading.py", line 484, in run
> >    self.__target(*self.__args, **self.__kwargs)
> >  File "sa_pg_advisory_locks.py", line 44, in run_test
> >    raise AssertionError('Iteration %i, acquisition pid %i, release
> > pid %i\n' % (i, pid, pid_))
> > AssertionError: Iteration 12, acquisition pid 27340, release pid 16676
>
> > Exception in thread Thread-7:
> > Traceback (most recent call last):
> >  File "/usr/lib/python2.6/threading.py", line 532, in
> > __bootstrap_inner
> >    self.run()
> >  File "/usr/lib/python2.6/threading.py", line 484, in run
> >    self.__target(*self.__args, **self.__kwargs)
> >  File "sa_pg_advisory_locks.py", line 44, in run_test
> >    raise AssertionError('Iteration %i, acquisition pid %i, release
> > pid %i\n' % (i, pid, pid_))
> > AssertionError: Iteration 10, acquisition pid 18248, release pid 8452
>
> > Exception in thread Thread-3:
> > Traceback (most recent call last):
> >  File "/usr/lib/python2.6/threading.py", line 532, in
> > __bootstrap_inner
> >    self.run()
> >  File "/usr/lib/python2.6/threading.py", line 484, in run
> >    self.__target(*self.__args, **self.__kwargs)
> >  File "sa_pg_advisory_locks.py", line 44, in run_test
> >    raise AssertionError('Iteration %i, acquisition pid %i, release
> > pid %i\n' % (i, pid, pid_))
> > AssertionError: Iteration 18, acquisition pid 8452, release pid 18248
>
> > u...@host ~
> > $ TEST_DSN=$JYTHON_DSN jython sa_pg_advisory_locks.py
> > Will commit before releasing advisory lock
> > Exception in thread Thread:Traceback (most recent call last):
> >  File "C:\cygwin\home\user\jython\Lib\threading.py", line 179, in
> > _Thread__bootstrap
> >    self.run()
> >  File "C:\cygwin\home\user\jython\Lib\threading.py", line 170, in run
> > Exception in thread Thread:Traceback (most recent call last):
> >  File "C:\cygwin\home\user\jython\Lib\threading.py", line 179, in
> > _Thread__bootstrap
> >    self._target(*self._args, **self._kwargs)
> >  File "sa_pg_advisory_locks.py", line 44, in run_test
> >    raise AssertionError('Iteration %i, acquisition pid %i, release
> > pid %i\n' % (i, pid, pid_))
> > AssertionError: Iteration 2, acquisition pid 18076, release pid 11332
>
> > Exception in thread Thread:Traceback (most recent call last):
> >  File "C:\cygwin\home\user\jython\Lib\threading.py", line 179, in
> > _Thread__bootstrap
> >    self.run()
> >  File "C:\cygwin\home\user\jython\Lib\threading.py", line 170, in run
> >    self._target(*self._args, **self._kwargs)
> >  File "sa_pg_advisory_locks.py", line 44, in run_test
> >    self.run()
> >  File "C:\cygwin\home\user\jython\Lib\threading.py", line 170, in run
> >    self._target(*self._args, **self._kwargs)
> >  File "sa_pg_advisory_locks.py", line 44, in run_test
> > Exception in thread Thread:Traceback (most recent call last):
> >  File "C:\cygwin\home\user\jython\Lib\threading.py", line 179, in
> > _Thread__bootstrap
> >    self.run()
> >  File "C:\cygwin\home\user\jython\Lib\threading.py", line 170, in run
> >    self._target(*self._args, **self._kwargs)
> >  File "sa_pg_advisory_locks.py", line 44, in run_test
> >    raise AssertionError('Iteration %i, acquisition pid %i, release
> > pid %i\n' % (i, pid, pid_))
> >    raise AssertionError('Iteration %i, acquisition pid %i, release
> > pid %i\n' % (i, pid, pid_))
> > AssertionError: Iteration 10, acquisition pid 11332, release pid 22572
>
> > AssertionError: Iteration 5, acquisition pid 22572, release pid 14376
>
> >    raise AssertionError('Iteration %i, acquisition pid %i, release
> > pid %i\n' % (i, pid, pid_))
> > AssertionError: Iteration 5, acquisition pid 14376, release pid 18076
>
> > u...@host ~
> > $ TEST_DSN=$CPYTHON_DSN python sa_pg_advisory_locks.py
> > Will commit after releasing advisory lock
> > 50
> > 50
> > 50
> > 50
> > 50
> > 50
> > 50
> > 50
> > 50
> > 50
>
> > u...@host ~
> > $ TEST_DSN=$JYTHON_DSN jython sa_pg_advisory_locks.py
> > Will commit after releasing advisory lock
> > 50
> > 50
> > 50
> > 50
> > 50
> > 50
> > 50
> > 50
> > 50
> > 50
>
> > --
> > You received this message because you are subscribed to the Google Groups 
> > "sqlalchemy" group.
> > To post to this group, send email to sqlalch...@googlegroups.com.
> > To unsubscribe from this group, send email to 
> > sqlalchemy+unsubscr...@googlegroups.com.
> > For more options, visit this group 
> > athttp://groups.google.com/group/sqlalchemy?hl=en.

-- 
You received this message because you are subscribed to the Google Groups 
"sqlalchemy" group.
To post to this group, send email to sqlalch...@googlegroups.com.
To unsubscribe from this group, send email to 
sqlalchemy+unsubscr...@googlegroups.com.
For more options, visit this group at 
http://groups.google.com/group/sqlalchemy?hl=en.

Reply via email to