[sqlalchemy] Reproducible oddity in with_for_update()

2015-06-15 Thread Brian Candler
I have an issue which I have boiled down to a full test case below. This 
test program reproduces the problem with both sqlalchemy 0.9.9 and 1.0.5, 
under python 2.7.6 and ubuntu 14.04, and PyMySQL-0.6.2.

There are a combination of circumstances:

1. After you rollback a session, touching any attribute of an object (even 
just accessing its id) causes the whole object to be re-read from the 
database. That's OK.
2. Reading the object again using a new query and with_for_update() 
generates a fresh query with SELECT .. FOR UPDATE. This is what I expect. 
It also correctly blocks if another client has the row locked.
3. However, once the query has completed, the data seen in the object 
appears to be the value read from the previous query, not the SELECT .. FOR 
UPDATE one.

In the test program, a database object is created with val=abc. Two 
threads both read the row under a lock, append X and write it back again. 
So the final answer should be abcXX, but in fact it's abcX.

Points to note:

- this has to be run on a proper database (I am using mysql). sqlite 
doesn't support SELECT .. FOR UPDATE.

- I have some workarounds. If instead of reading a new object I do 
db.refresh(v, 
lockmode=update) then all is fine. However I understood that the 
lockmode=string interface is being deprecated.

Similarly, if I discard the object using db.expire(v) before reading it 
again then it also works correctly. But in any case, I'd like to understand 
why it doesn't work to fetch the new object in the way I am, and I suspect 
a bug. Surely if SQLAlchemy has just issued a SELECT .. FOR UPDATE then the 
object should be updated with the values of that SELECT?

Regards,

Brian.

-
from __future__ import absolute_import, division, print_function, 
unicode_literals
from sqlalchemy import create_engine
from sqlalchemy import Column, Integer, String
from sqlalchemy.orm import sessionmaker
from sqlalchemy.ext.declarative import declarative_base

from contextlib import contextmanager
from six.moves.queue import Queue, Empty
from threading import Thread

DEFAULT_DB_URI = 'mysql+pymysql://root@localhost/testdb'

Base = declarative_base()

class Foo(Base):
__tablename__ = foo
id = Column(Integer, primary_key=True)
val = Column(String(255))

engine = create_engine(DEFAULT_DB_URI, echo=True)
try: Base.metadata.drop_all(engine)
except: pass
Base.metadata.create_all(engine)
Session = sessionmaker(bind=engine)

@contextmanager
def private_session():
s = Session()
try:
yield s
finally:
s.rollback()
s.close()

def runner(ref, omsg, imsg):
with private_session() as db:
print( Read object)
v = db.query(Foo).filter_by(id=ref).one()
print( Discard session)
db.rollback()
print( Get object's id)
id = v.id
print( Reload object with FOR UPDATE)
# db.expire(v)
v = db.query(Foo).filter_by(id=id).with_for_update().one()
# Alt: db.refresh(v, lockmode='update')
print( v.val=%r % v.val)
omsg.put(started)
imsg.get()
v.val += X
db.commit()

with private_session() as db:
f = Foo(id=1, val=abc)
db.add(f)
db.commit()

o1 = Queue()
i1 = Queue()
o2 = Queue()
i2 = Queue()

t1 = Thread(target=runner, kwargs={ref:1, omsg: o1, imsg: i1})
t2 = Thread(target=runner, kwargs={ref:1, omsg: o2, imsg: i2})

t1.start()
assert o1.get(True, 1) == started
# Next thread should block on SELECT FOR UPDATE
t2.start()
try:
o2.get(True, 1)
raise RuntimeError(This thread should be blocked on SELECT FOR 
UPDATE)
except Empty:
pass
# Let first thread complete
i1.put(go)
# Now second thread is unblocked
assert o2.get(True, 1) == started
i2.put(go)

t1.join(2)
assert not t1.isAlive()
t2.join(2)
assert not t2.isAlive()

# Check final state
print(*** FINISHED ***)
id = f.id
print(*** RESULTS ***)
print(id=%d % f.id)
print(val=%r % f.val)

Base.metadata.drop_all(engine)

-- 
You received this message because you are subscribed to the Google Groups 
sqlalchemy group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to sqlalchemy+unsubscr...@googlegroups.com.
To post to this group, send email to sqlalchemy@googlegroups.com.
Visit this group at http://groups.google.com/group/sqlalchemy.
For more options, visit https://groups.google.com/d/optout.


Re: [sqlalchemy] Reproducible oddity in with_for_update()

2015-06-15 Thread Mike Bayer



On 6/15/15 3:01 PM, Brian Candler wrote:
I have an issue which I have boiled down to a full test case below. 
This test program reproduces the problem with both sqlalchemy 0.9.9 
and 1.0.5, under python 2.7.6 and ubuntu 14.04, and PyMySQL-0.6.2.


There are a combination of circumstances:

1. After you rollback a session, touching any attribute of an object 
(even just accessing its id) causes the whole object to be re-read 
from the database. That's OK.
2. Reading the object again using a new query and with_for_update() 
generates a fresh query with SELECT .. FOR UPDATE. This is what I 
expect. It also correctly blocks if another client has the row locked.
3. However, once the query has completed, the data seen in the object 
appears to be the value read from the previous query, not the SELECT 
.. FOR UPDATE one.


either run session.expire_all()  / session.expire(some_object) ahead of 
time, or run the query including the populate_existing() method:


http://docs.sqlalchemy.org/en/rel_1_0/orm/query.html?highlight=populate_existing#sqlalchemy.orm.query.Query.populate_existing






In the test program, a database object is created with val=abc. Two 
threads both read the row under a lock, append X and write it back 
again. So the final answer should be abcXX, but in fact it's abcX.


Points to note:

- this has to be run on a proper database (I am using mysql). sqlite 
doesn't support SELECT .. FOR UPDATE.


- I have some workarounds. If instead of reading a new object I do 
db.refresh(v, lockmode=update) then all is fine. However I 
understood that the lockmode=string interface is being deprecated.


Similarly, if I discard the object using db.expire(v) before reading 
it again then it also works correctly. But in any case, I'd like to 
understand why it doesn't work to fetch the new object in the way I 
am, and I suspect a bug. Surely if SQLAlchemy has just issued a SELECT 
.. FOR UPDATE then the object should be updated with the values of 
that SELECT?


Regards,

Brian.

-
from __future__ import absolute_import, division, print_function, 
unicode_literals

from sqlalchemy import create_engine
from sqlalchemy import Column, Integer, String
from sqlalchemy.orm import sessionmaker
from sqlalchemy.ext.declarative import declarative_base

from contextlib import contextmanager
from six.moves.queue import Queue, Empty
from threading import Thread

DEFAULT_DB_URI = 'mysql+pymysql://root@localhost/testdb'

Base = declarative_base()

class Foo(Base):
__tablename__ = foo
id = Column(Integer, primary_key=True)
val = Column(String(255))

engine = create_engine(DEFAULT_DB_URI, echo=True)
try: Base.metadata.drop_all(engine)
except: pass
Base.metadata.create_all(engine)
Session = sessionmaker(bind=engine)

@contextmanager
def private_session():
s = Session()
try:
yield s
finally:
s.rollback()
s.close()

def runner(ref, omsg, imsg):
with private_session() as db:
print( Read object)
v = db.query(Foo).filter_by(id=ref).one()
print( Discard session)
db.rollback()
print( Get object's id)
id = v.id
print( Reload object with FOR UPDATE)
# db.expire(v)
v = db.query(Foo).filter_by(id=id).with_for_update().one()
# Alt: db.refresh(v, lockmode='update')
print( v.val=%r % v.val)
omsg.put(started)
imsg.get()
v.val += X
db.commit()

with private_session() as db:
f = Foo(id=1, val=abc)
db.add(f)
db.commit()

o1 = Queue()
i1 = Queue()
o2 = Queue()
i2 = Queue()

t1 = Thread(target=runner, kwargs={ref:1, omsg: o1, imsg: i1})
t2 = Thread(target=runner, kwargs={ref:1, omsg: o2, imsg: i2})

t1.start()
assert o1.get(True, 1) == started
# Next thread should block on SELECT FOR UPDATE
t2.start()
try:
o2.get(True, 1)
raise RuntimeError(This thread should be blocked on SELECT 
FOR UPDATE)

except Empty:
pass
# Let first thread complete
i1.put(go)
# Now second thread is unblocked
assert o2.get(True, 1) == started
i2.put(go)

t1.join(2)
assert not t1.isAlive()
t2.join(2)
assert not t2.isAlive()

# Check final state
print(*** FINISHED ***)
id = f.id
print(*** RESULTS ***)
print(id=%d % f.id)
print(val=%r % f.val)

Base.metadata.drop_all(engine)

--
You received this message because you are subscribed to the Google 
Groups sqlalchemy group.
To unsubscribe from this group and stop receiving emails from it, send 
an email to sqlalchemy+unsubscr...@googlegroups.com 
mailto:sqlalchemy+unsubscr...@googlegroups.com.
To post to this group, send email to sqlalchemy@googlegroups.com 
mailto:sqlalchemy@googlegroups.com.

Visit this group at http://groups.google.com/group/sqlalchemy.
For more options, visit https://groups.google.com/d/optout.


--
You received this message because you are subscribed to the Google Groups 
sqlalchemy group.
To unsubscribe from this