On 6/15/15 3:01 PM, Brian Candler wrote:
I have an issue which I have boiled down to a full test case below. This test program reproduces the problem with both sqlalchemy 0.9.9 and 1.0.5, under python 2.7.6 and ubuntu 14.04, and PyMySQL-0.6.2.

There are a combination of circumstances:

1. After you rollback a session, touching any attribute of an object (even just accessing its "id") causes the whole object to be re-read from the database. That's OK. 2. Reading the object again using a new query and with_for_update() generates a fresh query with "SELECT .. FOR UPDATE". This is what I expect. It also correctly blocks if another client has the row locked. 3. However, once the query has completed, the data seen in the object appears to be the value read from the previous query, not the SELECT .. FOR UPDATE one.

either run session.expire_all() / session.expire(some_object) ahead of time, or run the query including the populate_existing() method:

http://docs.sqlalchemy.org/en/rel_1_0/orm/query.html?highlight=populate_existing#sqlalchemy.orm.query.Query.populate_existing





In the test program, a database object is created with val="abc". Two threads both read the row under a lock, append X and write it back again. So the final answer should be abcXX, but in fact it's abcX.

Points to note:

- this has to be run on a proper database (I am using mysql). sqlite doesn't support SELECT .. FOR UPDATE.

- I have some workarounds. If instead of reading a new object I do db.refresh(v, lockmode="update") then all is fine. However I understood that the lockmode="string" interface is being deprecated.

Similarly, if I discard the object using db.expire(v) before reading it again then it also works correctly. But in any case, I'd like to understand why it doesn't work to fetch the new object in the way I am, and I suspect a bug. Surely if SQLAlchemy has just issued a SELECT .. FOR UPDATE then the object should be updated with the values of that SELECT?

Regards,

Brian.

-----------------
from __future__ import absolute_import, division, print_function, unicode_literals
from sqlalchemy import create_engine
from sqlalchemy import Column, Integer, String
from sqlalchemy.orm import sessionmaker
from sqlalchemy.ext.declarative import declarative_base

from contextlib import contextmanager
from six.moves.queue import Queue, Empty
from threading import Thread

DEFAULT_DB_URI = 'mysql+pymysql://root@localhost/testdb'

Base = declarative_base()

class Foo(Base):
    __tablename__ = "foo"
    id = Column(Integer, primary_key=True)
    val = Column(String(255))

engine = create_engine(DEFAULT_DB_URI, echo=True)
try: Base.metadata.drop_all(engine)
except: pass
Base.metadata.create_all(engine)
Session = sessionmaker(bind=engine)

@contextmanager
def private_session():
    s = Session()
    try:
        yield s
    finally:
s.rollback()
        s.close()

def runner(ref, omsg, imsg):
    with private_session() as db:
print("<<<< Read object")
        v = db.query(Foo).filter_by(id=ref).one()
        print("---- Discard session")
db.rollback()
print(">>>> Get object's id")
        id = v.id
        print("!!!! Reload object with FOR UPDATE")
        # db.expire(v)
        v = db.query(Foo).filter_by(id=id).with_for_update().one()
        # Alt: db.refresh(v, lockmode='update')
        print("==== v.val=%r" % v.val)
omsg.put("started")
        imsg.get()
        v.val += "X"
        db.commit()

with private_session() as db:
    f = Foo(id=1, val="abc")
    db.add(f)
    db.commit()

    o1 = Queue()
    i1 = Queue()
    o2 = Queue()
    i2 = Queue()

    t1 = Thread(target=runner, kwargs={"ref":1, "omsg": o1, "imsg": i1})
    t2 = Thread(target=runner, kwargs={"ref":1, "omsg": o2, "imsg": i2})

    t1.start()
    assert o1.get(True, 1) == "started"
    # Next thread should block on SELECT FOR UPDATE
    t2.start()
    try:
o2.get(True, 1)
raise RuntimeError("This thread should be blocked on SELECT FOR UPDATE")
    except Empty:
        pass
    # Let first thread complete
    i1.put("go")
    # Now second thread is unblocked
    assert o2.get(True, 1) == "started"
    i2.put("go")

    t1.join(2)
    assert not t1.isAlive()
    t2.join(2)
    assert not t2.isAlive()

    # Check final state
    print("*** FINISHED ***")
    id = f.id
    print("*** RESULTS ***")
    print("id=%d" % f.id)
    print("val=%r" % f.val)

Base.metadata.drop_all(engine)

--
You received this message because you are subscribed to the Google Groups "sqlalchemy" group. To unsubscribe from this group and stop receiving emails from it, send an email to sqlalchemy+unsubscr...@googlegroups.com <mailto:sqlalchemy+unsubscr...@googlegroups.com>. To post to this group, send email to sqlalchemy@googlegroups.com <mailto:sqlalchemy@googlegroups.com>.
Visit this group at http://groups.google.com/group/sqlalchemy.
For more options, visit https://groups.google.com/d/optout.

--
You received this message because you are subscribed to the Google Groups 
"sqlalchemy" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to sqlalchemy+unsubscr...@googlegroups.com.
To post to this group, send email to sqlalchemy@googlegroups.com.
Visit this group at http://groups.google.com/group/sqlalchemy.
For more options, visit https://groups.google.com/d/optout.

Reply via email to