On Aug 24, 2010, at 6:12 AM, Julien Demoor wrote:

> Hi,
> 
> I'm using PostgreSQL advisory locks in a multithreaded program. Worker
> threads acquire locks with
> session.execute(select([func.pg_advisory_lock(key)])) during a
> transaction and release them just after  session.commit(). Sometimes
> however, the connection behind the thread's session will have changed
> after COMMIT, making it impossible for the thread to release the locks
> it acquired. Note that I'm using scoped_session()'s class methods
> everywhere.
> 
> A python script to reproduce the problem and its output are attached
> below.
> 
> Is there a way to force the session to use the same connection within
> a thread ? Would I then have to recycle the connections from time to
> time ?

I think the most direct way is to bind the Session to a specific connection:

conn = engine.connect()
sess = Session(bind=conn)


then when the scope of work with the connection is complete:

sess.close()
conn.close()

the "indirect" way would be to play games with the connection pool, but in this 
case your application should already have explicit boundaries where you'd like 
this connection to stay in play so the above approach is straightforward.



> 
> Thanks,
> 
> Julien
> 
> === sa_pg_advisory_locks.py ==
> 
> # -*- coding: utf8 -*-
> import random
> import os
> import threading
> import time
> 
> from sqlalchemy import MetaData
> from sqlalchemy.engine import create_engine
> from sqlalchemy.orm import create_session, scoped_session,
> sessionmaker, reconstructor
> from sqlalchemy.sql import func, select
> 
> sa_engine = create_engine(os.environ['TEST_DSN'])
> 
> session = scoped_session(lambda: create_session(sa_engine,
> autoflush=True, expire_on_commit=True, autocommit=False))
> 
> # Toggle this switch to see the difference in behaviour
> COMMIT_BEFORE_LOCK_RELEASE = True
> # COMMIT_BEFORE_LOCK_RELEASE = False
> 
> print 'Will commit %s releasing advisory lock' % ('before' if
> COMMIT_BEFORE_LOCK_RELEASE else 'after')
> 
> # Synchronize program termination
> event = threading.Event()
> 
> # Test function, will run concurrently in two threads
> def run_test():
>       try:
>               i = 0
>               while 1:
>                       if event.isSet() or i >= 100:
>                               break
>                       # Show sign of life
>                       if i and (i % 50 == 0):
>                               print i
>                       key = random.randint(1,2**16)
>                       pid, _ = session.execute(select([func.pg_backend_pid(),
> func.pg_advisory_lock(key)])).fetchone()
>                       now = session.execute(select([func.now()])).scalar()
>                       if COMMIT_BEFORE_LOCK_RELEASE:
>                               session.commit()
>                       pid_, unlocked = 
> session.execute(select([func.pg_backend_pid(),
> func.pg_advisory_unlock(key)])).fetchone()
>                       if unlocked:
>                               assert pid_ == pid
>                       else:
>                               raise AssertionError('Iteration %i, acquisition 
> pid %i, release
> pid %i\n' % (i, pid, pid_))
>                       if not COMMIT_BEFORE_LOCK_RELEASE:
>                               session.commit()
>                       i += 1
>       except Exception:
>               event.set()
>               raise
>       event.set()
> 
> for i in xrange(10):
>       thread = threading.Thread(target=run_test)
>       thread.daemon = True
>       thread.start()
> 
> event.wait()
> time.sleep(1)
> 
> == output ==
> 
> u...@host ~
> $ TEST_DSN=$CPYTHON_DSN python sa_pg_advisory_locks.py
> Will commit before releasing advisory lock
> Exception in thread Thread-10:
> Traceback (most recent call last):
>  File "/usr/lib/python2.6/threading.py", line 532, in
> __bootstrap_inner
>    self.run()
>  File "/usr/lib/python2.6/threading.py", line 484, in run
>    self.__target(*self.__args, **self.__kwargs)
>  File "sa_pg_advisory_locks.py", line 44, in run_test
>    raise AssertionError('Iteration %i, acquisition pid %i, release
> pid %i\n' % (i, pid, pid_))
> AssertionError: Iteration 6, acquisition pid 16676, release pid 27340
> 
> 
> Exception in thread Thread-5:
> Traceback (most recent call last):
>  File "/usr/lib/python2.6/threading.py", line 532, in
> __bootstrap_inner
>    self.run()
>  File "/usr/lib/python2.6/threading.py", line 484, in run
>    self.__target(*self.__args, **self.__kwargs)
>  File "sa_pg_advisory_locks.py", line 44, in run_test
>    raise AssertionError('Iteration %i, acquisition pid %i, release
> pid %i\n' % (i, pid, pid_))
> AssertionError: Iteration 12, acquisition pid 27340, release pid 16676
> 
> 
> Exception in thread Thread-7:
> Traceback (most recent call last):
>  File "/usr/lib/python2.6/threading.py", line 532, in
> __bootstrap_inner
>    self.run()
>  File "/usr/lib/python2.6/threading.py", line 484, in run
>    self.__target(*self.__args, **self.__kwargs)
>  File "sa_pg_advisory_locks.py", line 44, in run_test
>    raise AssertionError('Iteration %i, acquisition pid %i, release
> pid %i\n' % (i, pid, pid_))
> AssertionError: Iteration 10, acquisition pid 18248, release pid 8452
> 
> 
> Exception in thread Thread-3:
> Traceback (most recent call last):
>  File "/usr/lib/python2.6/threading.py", line 532, in
> __bootstrap_inner
>    self.run()
>  File "/usr/lib/python2.6/threading.py", line 484, in run
>    self.__target(*self.__args, **self.__kwargs)
>  File "sa_pg_advisory_locks.py", line 44, in run_test
>    raise AssertionError('Iteration %i, acquisition pid %i, release
> pid %i\n' % (i, pid, pid_))
> AssertionError: Iteration 18, acquisition pid 8452, release pid 18248
> 
> 
> 
> u...@host ~
> $ TEST_DSN=$JYTHON_DSN jython sa_pg_advisory_locks.py
> Will commit before releasing advisory lock
> Exception in thread Thread:Traceback (most recent call last):
>  File "C:\cygwin\home\user\jython\Lib\threading.py", line 179, in
> _Thread__bootstrap
>    self.run()
>  File "C:\cygwin\home\user\jython\Lib\threading.py", line 170, in run
> Exception in thread Thread:Traceback (most recent call last):
>  File "C:\cygwin\home\user\jython\Lib\threading.py", line 179, in
> _Thread__bootstrap
>    self._target(*self._args, **self._kwargs)
>  File "sa_pg_advisory_locks.py", line 44, in run_test
>    raise AssertionError('Iteration %i, acquisition pid %i, release
> pid %i\n' % (i, pid, pid_))
> AssertionError: Iteration 2, acquisition pid 18076, release pid 11332
> 
> Exception in thread Thread:Traceback (most recent call last):
>  File "C:\cygwin\home\user\jython\Lib\threading.py", line 179, in
> _Thread__bootstrap
>    self.run()
>  File "C:\cygwin\home\user\jython\Lib\threading.py", line 170, in run
>    self._target(*self._args, **self._kwargs)
>  File "sa_pg_advisory_locks.py", line 44, in run_test
>    self.run()
>  File "C:\cygwin\home\user\jython\Lib\threading.py", line 170, in run
>    self._target(*self._args, **self._kwargs)
>  File "sa_pg_advisory_locks.py", line 44, in run_test
> Exception in thread Thread:Traceback (most recent call last):
>  File "C:\cygwin\home\user\jython\Lib\threading.py", line 179, in
> _Thread__bootstrap
>    self.run()
>  File "C:\cygwin\home\user\jython\Lib\threading.py", line 170, in run
>    self._target(*self._args, **self._kwargs)
>  File "sa_pg_advisory_locks.py", line 44, in run_test
>    raise AssertionError('Iteration %i, acquisition pid %i, release
> pid %i\n' % (i, pid, pid_))
>    raise AssertionError('Iteration %i, acquisition pid %i, release
> pid %i\n' % (i, pid, pid_))
> AssertionError: Iteration 10, acquisition pid 11332, release pid 22572
> 
> AssertionError: Iteration 5, acquisition pid 22572, release pid 14376
> 
>    raise AssertionError('Iteration %i, acquisition pid %i, release
> pid %i\n' % (i, pid, pid_))
> AssertionError: Iteration 5, acquisition pid 14376, release pid 18076
> 
> 
> u...@host ~
> $ TEST_DSN=$CPYTHON_DSN python sa_pg_advisory_locks.py
> Will commit after releasing advisory lock
> 50
> 50
> 50
> 50
> 50
> 50
> 50
> 50
> 50
> 50
> 
> u...@host ~
> $ TEST_DSN=$JYTHON_DSN jython sa_pg_advisory_locks.py
> Will commit after releasing advisory lock
> 50
> 50
> 50
> 50
> 50
> 50
> 50
> 50
> 50
> 50
> 
> -- 
> You received this message because you are subscribed to the Google Groups 
> "sqlalchemy" group.
> To post to this group, send email to sqlalch...@googlegroups.com.
> To unsubscribe from this group, send email to 
> sqlalchemy+unsubscr...@googlegroups.com.
> For more options, visit this group at 
> http://groups.google.com/group/sqlalchemy?hl=en.
> 

-- 
You received this message because you are subscribed to the Google Groups 
"sqlalchemy" group.
To post to this group, send email to sqlalch...@googlegroups.com.
To unsubscribe from this group, send email to 
sqlalchemy+unsubscr...@googlegroups.com.
For more options, visit this group at 
http://groups.google.com/group/sqlalchemy?hl=en.

Reply via email to