On Sep 3, 2013, at 8:47 AM, Paul Balomiri <paulbalom...@gmail.com> wrote:
> I would like to install > > event.listen(list, 'append', append_listener) > event.listen(list, 'remove', rm_listener) > > on those lists, such that the GroupByKeyCollection can modify added objects > according to the relationship it implements: > * set the appropiate foreign key constraints > * insert a removed object with it's new value for the key attribute after a > change (announced by append_listener) > * reset the fks upon item removal. using event.listen with GBK doesn't make sense. events can only be used with specific target types, the "remove" "append" events only apply to an ORM-produced InstrumentedAttribute, such as Person._addresses_by_role here (note, we mean the class-bound attribute, not the collection on an instance). There is no need to use event.listen with the collection itself, as remove/append are produced originally by the add()/remove() methods on GBK itself; any extra logic which should take place would be invoked directly from there (and in fact my original example fails to fire off the event with remove()). Additionally, all the usage of prepare_instrumentation() etc. should not be necessary, that's all internal stuff which is called automatically. As mentioned before, the behavior of this collection is completely outside the realm of a "normal" collection so it needs to implement the append/remove events directly, which isn't something a new user to SQLAlchemy would typically be able to handle without a much deeper understanding of how the attribute system works. I've implemented your test case as below as well as some other variants in association with the original code I gave you - for the "remove" case I've added the necessary code to the custom collection. All foreign key constraints are set correctly as a function of the ORM's normal operation, and as far as "reset", when an association between Person and Address is removed, we want to just delete the association so cascade is used for that. I'm not sure what "insert a removed object with it's new value for the key attribute after a change" means; add a test to the TestPA class illustrating the behavior you want and I'll add it. from sqlalchemy import * from sqlalchemy.orm import * from sqlalchemy.ext.declarative import declarative_base import collections from sqlalchemy.orm.collections import collection, collection_adapter from sqlalchemy.ext.associationproxy import association_proxy, _AssociationCollection Base = declarative_base() class GroupByKeyCollection(collections.defaultdict): def __init__(self, keyfunc): super(GroupByKeyCollection, self).__init__(list) self.keyfunc = keyfunc @collection.appender def add(self, value, _sa_initiator=None): key = self.keyfunc(value) self[key].append(value) @collection.remover def remove(self, value, _sa_initiator=None): key = self.keyfunc(value) adapter = collection_adapter(self) adapter.fire_remove_event(value, None) self[key].remove(value) @collection.internally_instrumented def __setitem__(self, key, value): adapter = collection_adapter(self) # the collection API usually provides these events transparently, but due to # the unusual structure, we pretty much have to fire them ourselves # for each item. for item in value: item = adapter.fire_append_event(item, None) collections.defaultdict.__setitem__(self, key, value) @collection.internally_instrumented def __delitem__(self, key): adapter = collection_adapter(self) for item in value: adapter.fire_remove_event(item, None) collections.defaultdict.__delitem__(self, key, value) @collection.iterator def iterate(self): for collection in self.values(): for item in collection: yield item @collection.converter def _convert(self, target): for collection in target.values(): for item in collection: yield item def update(self, k): raise NotImplementedError() class AssociationGBK(_AssociationCollection): def __init__(self, lazy_collection, creator, value_attr, parent): getter, setter = parent._default_getset(parent.collection_class) super(AssociationGBK, self).__init__( lazy_collection, creator, getter, setter, parent) def _create(self, key, value): return self.creator(key, value) def _get(self, object): return self.getter(object) def _set(self, object, key, value): return self.setter(object, key, value) def __getitem__(self, key): return [self._get(item) for item in self.col[key]] def __setitem__(self, key, value): self.col[key] = [self._create(key, item) for item in value] def add(self, key, item): self.col.add(self._create(key, item)) def remove(self, key, item): for i, val in enumerate(self[key]): if val == item: self.col.remove(self.col[key][i]) return raise ValueError("value not in list") def items(self): return ((key, [self._get(item) for item in self.col[key]]) for key in self.col) def update(self, kw): for key, value in kw.items(): self[key] = value def clear(self): self.col.clear() def copy(self): return dict(self.items()) def __repr__(self): return repr(dict(self.items())) # ------------------------------------------------------------------------------ Base = declarative_base() class Person(Base): __tablename__ = 'person' id = Column(Integer, primary_key=True) _addresses_by_role = relationship("PersonToAddress", cascade="all, delete-orphan", collection_class= lambda: GroupByKeyCollection( keyfunc=lambda item: item.role ) ) addresses_by_role = association_proxy( "_addresses_by_role", "address", proxy_factory=AssociationGBK, creator=lambda k, v: PersonToAddress(role=k, address=v)) class PersonToAddress(Base): __tablename__ = 'person_to_address' id = Column(Integer, primary_key=True) person_id = Column(Integer, ForeignKey('person.id')) address_id = Column(Integer, ForeignKey('address.id')) role = Column(String) person = relationship("Person", backref="p2as") address = relationship("Address") class Address(Base): __tablename__ = 'address' id = Column(Integer, primary_key=True) name = Column(String) import unittest class TestPA(unittest.TestCase): def setUp(self): self.e = create_engine("sqlite://", echo=True) Base.metadata.create_all(self.e) self.sess = Session(self.e) def tearDown(self): self.sess.close() self.e.dispose() def _assertAssociated(self, p1, a1, p2a): # custom collection self.assertEquals(p1._addresses_by_role["home"], [p2a]) # association proxy self.assertEquals(p1.addresses_by_role["home"], [a1]) # direct collection self.assertEquals(p1.p2as, [p2a]) # foreign keys set correctly self.assertEquals(p2a.person_id, p1.id) self.assertEquals(p2a.address_id, a1.id) def _assertDeassociated(self, p1, a1, p2a): # when deassociating p1 from a1, there's no more association. # we want p2a to be deleted: self.assertEquals(self.sess.query(PersonToAddress).count(), 0) # list is empty self.assertEquals(p1.addresses_by_role["home"], []) # direct collection empty self.assertEquals(p1.p2as, []) def test_associate_via_collection(self): sess = self.sess p1 = Person() a1 = Address(name="Bucharest") p2a = PersonToAddress(address=a1, role="home") sess.add(p1) # associate using the collection p1._addresses_by_role.add(p2a) sess.commit() self._assertAssociated(p1, a1, p2a) def test_associate_via_scalar(self): sess = self.sess p1 = Person() a1 = Address(name="Bucharest") # here, p2a already refers to a1/p1, the "_addresses_by_role" # will be set up when it loads after a commit p2a = PersonToAddress(address=a1, person=p1, role="home") sess.add(p1) sess.commit() self._assertAssociated(p1, a1, p2a) def test_associate_via_association_proxy(self): sess = self.sess p1 = Person() a1 = Address(name="Bucharest") sess.add(p1) # associate using the association proxy p1.addresses_by_role.add("home", a1) sess.commit() p2a = p1._addresses_by_role["home"][0] self._assertAssociated(p1, a1, p2a) def test_deassociate(self): sess = self.sess p1 = Person() a1 = Address(name="Bucharest") sess.add(p1) p1.addresses_by_role.add("home", a1) sess.commit() p2a = p1._addresses_by_role["home"][0] self._assertAssociated(p1, a1, p2a) # now deassociate p1.addresses_by_role.remove("home", a1) sess.commit() self._assertDeassociated(p1, a1, p2a) if __name__ == '__main__': unittest.main()
signature.asc
Description: Message signed with OpenPGP using GPGMail