[sqlalchemy] Session uses new connection after commit

2010-08-24 Thread Julien Demoor
Hi,

I'm using PostgreSQL advisory locks in a multithreaded program. Worker
threads acquire locks with
session.execute(select([func.pg_advisory_lock(key)])) during a
transaction and release them just after  session.commit(). Sometimes
however, the connection behind the thread's session will have changed
after COMMIT, making it impossible for the thread to release the locks
it acquired. Note that I'm using scoped_session()'s class methods
everywhere.

A python script to reproduce the problem and its output are attached
below.

Is there a way to force the session to use the same connection within
a thread ? Would I then have to recycle the connections from time to
time ?

Thanks,

Julien

=== sa_pg_advisory_locks.py ==

# -*- coding: utf8 -*-
import random
import os
import threading
import time

from sqlalchemy import MetaData
from sqlalchemy.engine import create_engine
from sqlalchemy.orm import create_session, scoped_session,
sessionmaker, reconstructor
from sqlalchemy.sql import func, select

sa_engine = create_engine(os.environ['TEST_DSN'])

session = scoped_session(lambda: create_session(sa_engine,
autoflush=True, expire_on_commit=True, autocommit=False))

# Toggle this switch to see the difference in behaviour
COMMIT_BEFORE_LOCK_RELEASE = True
# COMMIT_BEFORE_LOCK_RELEASE = False

print 'Will commit %s releasing advisory lock' % ('before' if
COMMIT_BEFORE_LOCK_RELEASE else 'after')

# Synchronize program termination
event = threading.Event()

# Test function, will run concurrently in two threads
def run_test():
try:
i = 0
while 1:
if event.isSet() or i = 100:
break
# Show sign of life
if i and (i % 50 == 0):
print i
key = random.randint(1,2**16)
pid, _ = session.execute(select([func.pg_backend_pid(),
func.pg_advisory_lock(key)])).fetchone()
now = session.execute(select([func.now()])).scalar()
if COMMIT_BEFORE_LOCK_RELEASE:
session.commit()
pid_, unlocked = 
session.execute(select([func.pg_backend_pid(),
func.pg_advisory_unlock(key)])).fetchone()
if unlocked:
assert pid_ == pid
else:
raise AssertionError('Iteration %i, acquisition 
pid %i, release
pid %i\n' % (i, pid, pid_))
if not COMMIT_BEFORE_LOCK_RELEASE:
session.commit()
i += 1
except Exception:
event.set()
raise
event.set()

for i in xrange(10):
thread = threading.Thread(target=run_test)
thread.daemon = True
thread.start()

event.wait()
time.sleep(1)

== output ==

u...@host ~
$ TEST_DSN=$CPYTHON_DSN python sa_pg_advisory_locks.py
Will commit before releasing advisory lock
Exception in thread Thread-10:
Traceback (most recent call last):
  File /usr/lib/python2.6/threading.py, line 532, in
__bootstrap_inner
self.run()
  File /usr/lib/python2.6/threading.py, line 484, in run
self.__target(*self.__args, **self.__kwargs)
  File sa_pg_advisory_locks.py, line 44, in run_test
raise AssertionError('Iteration %i, acquisition pid %i, release
pid %i\n' % (i, pid, pid_))
AssertionError: Iteration 6, acquisition pid 16676, release pid 27340


Exception in thread Thread-5:
Traceback (most recent call last):
  File /usr/lib/python2.6/threading.py, line 532, in
__bootstrap_inner
self.run()
  File /usr/lib/python2.6/threading.py, line 484, in run
self.__target(*self.__args, **self.__kwargs)
  File sa_pg_advisory_locks.py, line 44, in run_test
raise AssertionError('Iteration %i, acquisition pid %i, release
pid %i\n' % (i, pid, pid_))
AssertionError: Iteration 12, acquisition pid 27340, release pid 16676


Exception in thread Thread-7:
Traceback (most recent call last):
  File /usr/lib/python2.6/threading.py, line 532, in
__bootstrap_inner
self.run()
  File /usr/lib/python2.6/threading.py, line 484, in run
self.__target(*self.__args, **self.__kwargs)
  File sa_pg_advisory_locks.py, line 44, in run_test
raise AssertionError('Iteration %i, acquisition pid %i, release
pid %i\n' % (i, pid, pid_))
AssertionError: Iteration 10, acquisition pid 18248, release pid 8452


Exception in thread Thread-3:
Traceback (most recent call last):
  File /usr/lib/python2.6/threading.py, line 532, in
__bootstrap_inner
self.run()
  File /usr/lib/python2.6/threading.py, line 484, in run
self.__target(*self.__args, **self.__kwargs)
  File sa_pg_advisory_locks.py, line 44, in run_test
raise AssertionError('Iteration %i, acquisition pid %i, release
pid %i\n' % (i, pid, pid_))
AssertionError: Iteration 18, acquisition pid 8452, release pid 18248



u...@host ~
$ 

Re: [sqlalchemy] Session uses new connection after commit

2010-08-24 Thread Michael Bayer

On Aug 24, 2010, at 6:12 AM, Julien Demoor wrote:

 Hi,
 
 I'm using PostgreSQL advisory locks in a multithreaded program. Worker
 threads acquire locks with
 session.execute(select([func.pg_advisory_lock(key)])) during a
 transaction and release them just after  session.commit(). Sometimes
 however, the connection behind the thread's session will have changed
 after COMMIT, making it impossible for the thread to release the locks
 it acquired. Note that I'm using scoped_session()'s class methods
 everywhere.
 
 A python script to reproduce the problem and its output are attached
 below.
 
 Is there a way to force the session to use the same connection within
 a thread ? Would I then have to recycle the connections from time to
 time ?

I think the most direct way is to bind the Session to a specific connection:

conn = engine.connect()
sess = Session(bind=conn)


then when the scope of work with the connection is complete:

sess.close()
conn.close()

the indirect way would be to play games with the connection pool, but in this 
case your application should already have explicit boundaries where you'd like 
this connection to stay in play so the above approach is straightforward.



 
 Thanks,
 
 Julien
 
 === sa_pg_advisory_locks.py ==
 
 # -*- coding: utf8 -*-
 import random
 import os
 import threading
 import time
 
 from sqlalchemy import MetaData
 from sqlalchemy.engine import create_engine
 from sqlalchemy.orm import create_session, scoped_session,
 sessionmaker, reconstructor
 from sqlalchemy.sql import func, select
 
 sa_engine = create_engine(os.environ['TEST_DSN'])
 
 session = scoped_session(lambda: create_session(sa_engine,
 autoflush=True, expire_on_commit=True, autocommit=False))
 
 # Toggle this switch to see the difference in behaviour
 COMMIT_BEFORE_LOCK_RELEASE = True
 # COMMIT_BEFORE_LOCK_RELEASE = False
 
 print 'Will commit %s releasing advisory lock' % ('before' if
 COMMIT_BEFORE_LOCK_RELEASE else 'after')
 
 # Synchronize program termination
 event = threading.Event()
 
 # Test function, will run concurrently in two threads
 def run_test():
   try:
   i = 0
   while 1:
   if event.isSet() or i = 100:
   break
   # Show sign of life
   if i and (i % 50 == 0):
   print i
   key = random.randint(1,2**16)
   pid, _ = session.execute(select([func.pg_backend_pid(),
 func.pg_advisory_lock(key)])).fetchone()
   now = session.execute(select([func.now()])).scalar()
   if COMMIT_BEFORE_LOCK_RELEASE:
   session.commit()
   pid_, unlocked = 
 session.execute(select([func.pg_backend_pid(),
 func.pg_advisory_unlock(key)])).fetchone()
   if unlocked:
   assert pid_ == pid
   else:
   raise AssertionError('Iteration %i, acquisition 
 pid %i, release
 pid %i\n' % (i, pid, pid_))
   if not COMMIT_BEFORE_LOCK_RELEASE:
   session.commit()
   i += 1
   except Exception:
   event.set()
   raise
   event.set()
 
 for i in xrange(10):
   thread = threading.Thread(target=run_test)
   thread.daemon = True
   thread.start()
 
 event.wait()
 time.sleep(1)
 
 == output ==
 
 u...@host ~
 $ TEST_DSN=$CPYTHON_DSN python sa_pg_advisory_locks.py
 Will commit before releasing advisory lock
 Exception in thread Thread-10:
 Traceback (most recent call last):
  File /usr/lib/python2.6/threading.py, line 532, in
 __bootstrap_inner
self.run()
  File /usr/lib/python2.6/threading.py, line 484, in run
self.__target(*self.__args, **self.__kwargs)
  File sa_pg_advisory_locks.py, line 44, in run_test
raise AssertionError('Iteration %i, acquisition pid %i, release
 pid %i\n' % (i, pid, pid_))
 AssertionError: Iteration 6, acquisition pid 16676, release pid 27340
 
 
 Exception in thread Thread-5:
 Traceback (most recent call last):
  File /usr/lib/python2.6/threading.py, line 532, in
 __bootstrap_inner
self.run()
  File /usr/lib/python2.6/threading.py, line 484, in run
self.__target(*self.__args, **self.__kwargs)
  File sa_pg_advisory_locks.py, line 44, in run_test
raise AssertionError('Iteration %i, acquisition pid %i, release
 pid %i\n' % (i, pid, pid_))
 AssertionError: Iteration 12, acquisition pid 27340, release pid 16676
 
 
 Exception in thread Thread-7:
 Traceback (most recent call last):
  File /usr/lib/python2.6/threading.py, line 532, in
 __bootstrap_inner
self.run()
  File /usr/lib/python2.6/threading.py, line 484, in run
self.__target(*self.__args, **self.__kwargs)
  File sa_pg_advisory_locks.py, line 44, in run_test
raise AssertionError('Iteration %i, acquisition pid %i, release
 pid %i\n' % (i, pid, pid_))