On Tue, May 28, 2019, at 9:10 AM, Chris Wilson wrote: > Dear Michael,
> > I have discovered a limitation of TypeDecorators (custom column types): any > one that uses the database (e.g. to load objects serialised in a custom way) > has no way to know which database session to use. During initial load one can > use a global session object, but expired attributes may need to be refreshed > at any time, when the current session is not necessarily the one that loaded > the attribute, or its parent object. > > For example, consider a TypeDecorator that stores a large number of Cats by > ID, compressed to save space in the database: > > import struct > import zlib > > from sqlalchemy import Column, Integer, LargeBinary, Text, TypeDecorator, > create_engine > from sqlalchemy.ext.declarative import declarative_base > from sqlalchemy.orm import object_session, sessionmaker > > Base = declarative_base() > current_session = None > > > class Cat(Base): > __tablename__ = 'cat' > id = Column(Integer, primary_key=True) > name = Column(Text) > > > class CompressedCats(TypeDecorator): > ''' A house can have so many cats that we need to compress them. ''' > > impl = LargeBinary > _struct_protocol_id = struct.Struct('<i') > > def process_bind_param(self, value, _dialect): > # Typically the column can be nullable, so you can write None to it: support > that. > if value is None: > return None > > # FIXME: allow specification of different protocol id, and > # use the id to determine the protocol argument to pass to serialize. > return zlib.compress(b''.join(self._struct_protocol_id.pack(cat.id) for cat > in value)) > > def process_result_value(self, value, _dialect): > # Typically the column can be nullable, so you can read None from it: support > that. > if value is None: > return None > > # FIXME: unpack and use protocol id to determine protocol argument to pass to > deserialize. > decompressed = zlib.decompress(value) > cat_ids = [self._struct_protocol_id.unpack(decompressed[i:i + 4])[0] for i > in range(0, len(decompressed), 4)] > # BUG: Which session can we use here? > global current_session > return [current_session.query(Cat).filter_by(id=cat_id).one() for cat_id in > cat_ids] > > > class House(Base): > __tablename__ = 'house' > > id = Column(Integer, primary_key=True) > # A house can have so many cats that we need to compress them: > cats = Column(CompressedCats) > > engine = create_engine('sqlite://') > engine.echo = True > Base.metadata.create_all(engine) > > DBSession = sessionmaker(bind=engine) > session_1 = DBSession(autocommit=True, expire_on_commit=True) > > with session_1.begin(): > current_session = session_1 > with session_1.begin_nested(): > cats = [Cat(id=i, name='Cat #{i}') for i in range(100)] > session_1.add_all(cats) > house = House(cats=cats) > session_1.add(house) > > session_2 = DBSession(autocommit=True) > with session_2.begin(): > current_session = session_2 > assert len(house.cats) == 100 > assert object_session(house.cats[0]) is session_1 # fails because the Cats > were loaded from session_2 instead > > # We don't even need to do this to trigger the bug, unless we construct > session_1 with expire_on_commit=False > session_1.expire(house, ['cats']) > assert len(house.cats) == 100 > assert object_session(house.cats[0]) is session_1 > > At this point, house.cats needs to be reloaded, but if we use the current > session (session_2) then it will be incorrect. Here's how to attach the correct Session to a threading local within the scope of the execution of each statement: import threading from sqlalchemy import event global_session = threading.local() @event.listens_for(Session, "after_begin") def attach_session_to_connection(session, transaction, connection): connection.info["session"] = session engine = create_engine("sqlite://") @event.listens_for(engine, "before_execute") def attach_session(conn, clauseelement, multiparams, params): if "session" in conn.info: global_session.session = conn.info['session'] then inside your type handler: current_session = global_session.session return [ current_session.query(Cat).filter_by(id=cat_id).one() for cat_id in cat_ids ] Below is your program above passing its test using this approach, I hope this solves your problem in a simple way: import struct import threading import zlib from sqlalchemy import Column from sqlalchemy import create_engine from sqlalchemy import event from sqlalchemy import Integer from sqlalchemy import LargeBinary from sqlalchemy import Text from sqlalchemy import TypeDecorator from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.orm import object_session from sqlalchemy.orm import Session from sqlalchemy.orm import sessionmaker Base = declarative_base() global_session = threading.local() @event.listens_for(Session, "after_begin") def attach_session_to_connection(session, transaction, connection): connection.info["session"] = session engine = create_engine("sqlite://") @event.listens_for(engine, "before_execute") def attach_session(conn, clauseelement, multiparams, params): if "session" in conn.info: global_session.session = conn.info['session'] class Cat(Base): __tablename__ = "cat" id = Column(Integer, primary_key=True) name = Column(Text) class CompressedCats(TypeDecorator): """ A house can have so many cats that we need to compress them. """ impl = LargeBinary _struct_protocol_id = struct.Struct("<i") def process_bind_param(self, value, _dialect): if value is None: return None return zlib.compress( b"".join(self._struct_protocol_id.pack(cat.id) for cat in value) ) def process_result_value(self, value, _dialect): if value is None: return None decompressed = zlib.decompress(value) cat_ids = [ self._struct_protocol_id.unpack(decompressed[i : i + 4])[0] for i in range(0, len(decompressed), 4) ] current_session = global_session.session return [ current_session.query(Cat).filter_by(id=cat_id).one() for cat_id in cat_ids ] class House(Base): __tablename__ = "house" id = Column(Integer, primary_key=True) cats = Column(CompressedCats) engine.echo = True Base.metadata.create_all(engine) DBSession = sessionmaker(bind=engine) session_1 = DBSession(autocommit=True, expire_on_commit=True) with session_1.begin(): current_session = session_1 with session_1.begin_nested(): cats = [Cat(id=i, name="Cat #{i}") for i in range(100)] session_1.add_all(cats) house = House(cats=cats) session_1.add(house) session_2 = DBSession(autocommit=True) with session_2.begin(): current_session = session_2 assert len(house.cats) == 100 assert ( object_session(house.cats[0]) is session_1 ) # fails because the Cats were loaded from session_2 instead session_1.expire(house, ["cats"]) assert len(house.cats) == 100 assert object_session(house.cats[0]) is session_1 > > Arguably this is the fault of the TypeDecorator in question, but it seems > impossible to write one which is correct, because the session is not passed > to process_result_value. _populate_full has access to it (in state.session), > but does not pass it to getter (TypeDecorator.result_processor). > > Please let me know if you can think of a solution or workaround. > > Thanks, Chris. > > > > > > > *This email is confidential. If you are not the intended recipient, please > advise us immediately and delete this message. The registered name of Cantab- > part of GAM Systematic is Cantab Capital Partners LLP. See - > http://www.gam.com/en/Legal/Email+disclosures+EU for further information on > confidentiality, the risks of non-secure electronic communication, and > certain disclosures which we are required to make in accordance with > applicable legislation and regulations. If you cannot access this link, > please notify us by reply message and we will send the contents to you. > > GAM Holding AG and its subsidiaries (Cantab – GAM Systematic) will collect > and use information about you in the course of your interactions with us. > Full details about the data types we collect and what we use this for and > your related rights is set out in our online privacy policy at > https://www.gam.com/en/legal/privacy-policy. Please familiarise yourself with > this policy and check it from time to time for updates as it supplements this > notice ** * > > -- > SQLAlchemy - > The Python SQL Toolkit and Object Relational Mapper > > http://www.sqlalchemy.org/ > > To post example code, please provide an MCVE: Minimal, Complete, and > Verifiable Example. See http://stackoverflow.com/help/mcve for a full > description. > --- > You received this message because you are subscribed to the Google Groups > "sqlalchemy" group. > To unsubscribe from this group and stop receiving emails from it, send an > email to sqlalchemy+unsubscr...@googlegroups.com. > To post to this group, send email to sqlalchemy@googlegroups.com. > Visit this group at https://groups.google.com/group/sqlalchemy. > To view this discussion on the web visit > https://groups.google.com/d/msgid/sqlalchemy/BCCA73C2165E8947A2E786EC482564DE013E32B1E4%40ccpmaildag02.cantab.local > > <https://groups.google.com/d/msgid/sqlalchemy/BCCA73C2165E8947A2E786EC482564DE013E32B1E4%40ccpmaildag02.cantab.local?utm_medium=email&utm_source=footer>. > For more options, visit https://groups.google.com/d/optout. -- SQLAlchemy - The Python SQL Toolkit and Object Relational Mapper http://www.sqlalchemy.org/ To post example code, please provide an MCVE: Minimal, Complete, and Verifiable Example. See http://stackoverflow.com/help/mcve for a full description. --- You received this message because you are subscribed to the Google Groups "sqlalchemy" group. To unsubscribe from this group and stop receiving emails from it, send an email to sqlalchemy+unsubscr...@googlegroups.com. To post to this group, send email to sqlalchemy@googlegroups.com. Visit this group at https://groups.google.com/group/sqlalchemy. To view this discussion on the web visit https://groups.google.com/d/msgid/sqlalchemy/d152c4f1-bb55-4d96-878b-8d418b4c1058%40www.fastmail.com. For more options, visit https://groups.google.com/d/optout.