Hi,

I'm using PostgreSQL advisory locks in a multithreaded program. Worker
threads acquire locks with
session.execute(select([func.pg_advisory_lock(key)])) during a
transaction and release them just after  session.commit(). Sometimes
however, the connection behind the thread's session will have changed
after COMMIT, making it impossible for the thread to release the locks
it acquired. Note that I'm using scoped_session()'s class methods
everywhere.

A python script to reproduce the problem and its output are attached
below.

Is there a way to force the session to use the same connection within
a thread ? Would I then have to recycle the connections from time to
time ?

Thanks,

Julien

=== sa_pg_advisory_locks.py ==

# -*- coding: utf8 -*-
import random
import os
import threading
import time

from sqlalchemy import MetaData
from sqlalchemy.engine import create_engine
from sqlalchemy.orm import create_session, scoped_session,
sessionmaker, reconstructor
from sqlalchemy.sql import func, select

sa_engine = create_engine(os.environ['TEST_DSN'])

session = scoped_session(lambda: create_session(sa_engine,
autoflush=True, expire_on_commit=True, autocommit=False))

# Toggle this switch to see the difference in behaviour
COMMIT_BEFORE_LOCK_RELEASE = True
# COMMIT_BEFORE_LOCK_RELEASE = False

print 'Will commit %s releasing advisory lock' % ('before' if
COMMIT_BEFORE_LOCK_RELEASE else 'after')

# Synchronize program termination
event = threading.Event()

# Test function, will run concurrently in two threads
def run_test():
        try:
                i = 0
                while 1:
                        if event.isSet() or i >= 100:
                                break
                        # Show sign of life
                        if i and (i % 50 == 0):
                                print i
                        key = random.randint(1,2**16)
                        pid, _ = session.execute(select([func.pg_backend_pid(),
func.pg_advisory_lock(key)])).fetchone()
                        now = session.execute(select([func.now()])).scalar()
                        if COMMIT_BEFORE_LOCK_RELEASE:
                                session.commit()
                        pid_, unlocked = 
session.execute(select([func.pg_backend_pid(),
func.pg_advisory_unlock(key)])).fetchone()
                        if unlocked:
                                assert pid_ == pid
                        else:
                                raise AssertionError('Iteration %i, acquisition 
pid %i, release
pid %i\n' % (i, pid, pid_))
                        if not COMMIT_BEFORE_LOCK_RELEASE:
                                session.commit()
                        i += 1
        except Exception:
                event.set()
                raise
        event.set()

for i in xrange(10):
        thread = threading.Thread(target=run_test)
        thread.daemon = True
        thread.start()

event.wait()
time.sleep(1)

== output ==

u...@host ~
$ TEST_DSN=$CPYTHON_DSN python sa_pg_advisory_locks.py
Will commit before releasing advisory lock
Exception in thread Thread-10:
Traceback (most recent call last):
  File "/usr/lib/python2.6/threading.py", line 532, in
__bootstrap_inner
    self.run()
  File "/usr/lib/python2.6/threading.py", line 484, in run
    self.__target(*self.__args, **self.__kwargs)
  File "sa_pg_advisory_locks.py", line 44, in run_test
    raise AssertionError('Iteration %i, acquisition pid %i, release
pid %i\n' % (i, pid, pid_))
AssertionError: Iteration 6, acquisition pid 16676, release pid 27340


Exception in thread Thread-5:
Traceback (most recent call last):
  File "/usr/lib/python2.6/threading.py", line 532, in
__bootstrap_inner
    self.run()
  File "/usr/lib/python2.6/threading.py", line 484, in run
    self.__target(*self.__args, **self.__kwargs)
  File "sa_pg_advisory_locks.py", line 44, in run_test
    raise AssertionError('Iteration %i, acquisition pid %i, release
pid %i\n' % (i, pid, pid_))
AssertionError: Iteration 12, acquisition pid 27340, release pid 16676


Exception in thread Thread-7:
Traceback (most recent call last):
  File "/usr/lib/python2.6/threading.py", line 532, in
__bootstrap_inner
    self.run()
  File "/usr/lib/python2.6/threading.py", line 484, in run
    self.__target(*self.__args, **self.__kwargs)
  File "sa_pg_advisory_locks.py", line 44, in run_test
    raise AssertionError('Iteration %i, acquisition pid %i, release
pid %i\n' % (i, pid, pid_))
AssertionError: Iteration 10, acquisition pid 18248, release pid 8452


Exception in thread Thread-3:
Traceback (most recent call last):
  File "/usr/lib/python2.6/threading.py", line 532, in
__bootstrap_inner
    self.run()
  File "/usr/lib/python2.6/threading.py", line 484, in run
    self.__target(*self.__args, **self.__kwargs)
  File "sa_pg_advisory_locks.py", line 44, in run_test
    raise AssertionError('Iteration %i, acquisition pid %i, release
pid %i\n' % (i, pid, pid_))
AssertionError: Iteration 18, acquisition pid 8452, release pid 18248



u...@host ~
$ TEST_DSN=$JYTHON_DSN jython sa_pg_advisory_locks.py
Will commit before releasing advisory lock
Exception in thread Thread:Traceback (most recent call last):
  File "C:\cygwin\home\user\jython\Lib\threading.py", line 179, in
_Thread__bootstrap
    self.run()
  File "C:\cygwin\home\user\jython\Lib\threading.py", line 170, in run
Exception in thread Thread:Traceback (most recent call last):
  File "C:\cygwin\home\user\jython\Lib\threading.py", line 179, in
_Thread__bootstrap
    self._target(*self._args, **self._kwargs)
  File "sa_pg_advisory_locks.py", line 44, in run_test
    raise AssertionError('Iteration %i, acquisition pid %i, release
pid %i\n' % (i, pid, pid_))
AssertionError: Iteration 2, acquisition pid 18076, release pid 11332

Exception in thread Thread:Traceback (most recent call last):
  File "C:\cygwin\home\user\jython\Lib\threading.py", line 179, in
_Thread__bootstrap
    self.run()
  File "C:\cygwin\home\user\jython\Lib\threading.py", line 170, in run
    self._target(*self._args, **self._kwargs)
  File "sa_pg_advisory_locks.py", line 44, in run_test
    self.run()
  File "C:\cygwin\home\user\jython\Lib\threading.py", line 170, in run
    self._target(*self._args, **self._kwargs)
  File "sa_pg_advisory_locks.py", line 44, in run_test
Exception in thread Thread:Traceback (most recent call last):
  File "C:\cygwin\home\user\jython\Lib\threading.py", line 179, in
_Thread__bootstrap
    self.run()
  File "C:\cygwin\home\user\jython\Lib\threading.py", line 170, in run
    self._target(*self._args, **self._kwargs)
  File "sa_pg_advisory_locks.py", line 44, in run_test
    raise AssertionError('Iteration %i, acquisition pid %i, release
pid %i\n' % (i, pid, pid_))
    raise AssertionError('Iteration %i, acquisition pid %i, release
pid %i\n' % (i, pid, pid_))
AssertionError: Iteration 10, acquisition pid 11332, release pid 22572

AssertionError: Iteration 5, acquisition pid 22572, release pid 14376

    raise AssertionError('Iteration %i, acquisition pid %i, release
pid %i\n' % (i, pid, pid_))
AssertionError: Iteration 5, acquisition pid 14376, release pid 18076


u...@host ~
$ TEST_DSN=$CPYTHON_DSN python sa_pg_advisory_locks.py
Will commit after releasing advisory lock
50
50
50
50
50
50
50
50
50
50

u...@host ~
$ TEST_DSN=$JYTHON_DSN jython sa_pg_advisory_locks.py
Will commit after releasing advisory lock
50
50
50
50
50
50
50
50
50
50

-- 
You received this message because you are subscribed to the Google Groups 
"sqlalchemy" group.
To post to this group, send email to sqlalch...@googlegroups.com.
To unsubscribe from this group, send email to 
sqlalchemy+unsubscr...@googlegroups.com.
For more options, visit this group at 
http://groups.google.com/group/sqlalchemy?hl=en.

Reply via email to